Import code for new DNS controller

It uses the dnsprovider code in the k8s repo, and it looks for
annotations on pods, ingress, nodes & services.
This commit is contained in:
Justin Santa Barbara 2016-07-24 21:59:04 -04:00
parent bcc1980743
commit bdb8e77079
15 changed files with 1444 additions and 0 deletions

View File

@ -35,6 +35,8 @@ gofmt:
gofmt -w -s upup/pkg/
gofmt -w -s protokube/cmd
gofmt -w -s protokube/pkg
gofmt -w -s dns-controller/cmd
gofmt -w -s dns-controller/pkg
kops-tar: gocode
rm -rf .build/kops/tar
@ -99,3 +101,19 @@ protokube-push: protokube-image
dns-controller-gocode:
go install k8s.io/kops/dns-controller/cmd/dns-controller
dns-controller-builder-image:
docker build -f images/dns-controller-builder/Dockerfile -t dns-controller-builder .
dns-controller-build-in-docker: dns-controller-builder-image
docker run -it -v `pwd`:/src dns-controller-builder /onbuild.sh
dns-controller-image: dns-controller-build-in-docker
docker build -t ${DOCKER_REGISTRY}/dns-controller:1.3 -f images/dns-controller/Dockerfile .
dns-controller-push: dns-controller-image
docker push ${DOCKER_REGISTRY}/dns-controller:1.3

17
dns-controller/README.md Normal file
View File

@ -0,0 +1,17 @@
dns-controller creates DNS records
In the bring-up of a new cluster, protokube has already ensured that we have an etcd cluster and an apiserver. It also
sets up DNS records for the etcd nodes (this is a much simpler problem, because we have a 1:1 mapping from an etcd
node to a DNS name.)
However, none of the nodes can reach the api server to register. Nor can end-users reach the API. In future
we might expose the API server as a normal service via Type=LoadBalancer or via a normal Ingress, but for now
we just expose it via DNS.
The dns-controller recognizes annotations on nodes.
`dns.alpha.kubernetes.io/external` will set up records for accessing the resource externally
`dns.alpha.kubernetes.io/internal` will set up records for accessing the resource internally
The syntax is a comma separated list of fully qualified domain names.

View File

@ -0,0 +1,99 @@
package main
import (
"flag"
"github.com/golang/glog"
"github.com/spf13/pflag"
"k8s.io/kops/dns-controller/pkg/dns"
"k8s.io/kops/dns-controller/pkg/watchers"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3/typed/core/v1"
client_extensions "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3/typed/extensions/v1beta1"
kubectl_util "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"os"
_ "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/aws/route53"
_ "k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns"
)
var (
flags = pflag.NewFlagSet("", pflag.ExitOnError)
)
func main() {
dnsProviderId := "aws-route53"
flags.StringVar(&dnsProviderId, "dns", dnsProviderId, "DNS provider we should use (aws-route53, google-clouddns)")
// Trick to avoid 'logging before flag.Parse' warning
flag.CommandLine.Parse([]string{})
flag.Set("logtostderr", "true")
flags.AddGoFlagSet(flag.CommandLine)
clientConfig := kubectl_util.DefaultClientConfig(flags)
flags.Parse(os.Args)
config, err := clientConfig.ClientConfig()
if err != nil {
glog.Errorf("error building client configuration: %v", err)
os.Exit(1)
}
kubeClient, err := client.NewForConfig(config)
if err != nil {
glog.Fatalf("error building REST client: %v", err)
}
kubeExtensionsClient, err := client_extensions.NewForConfig(config)
if err != nil {
glog.Fatalf("error building extensions REST client: %v", err)
}
dnsProvider, err := dnsprovider.GetDnsProvider(dnsProviderId, nil)
if err != nil {
glog.Errorf("Error initializing DNS provider %q: %v", dnsProviderId, err)
os.Exit(1)
}
if dnsProvider == nil {
glog.Errorf("DNS provider was nil %q: %v", dnsProviderId, err)
os.Exit(1)
}
dnsController, err := dns.NewDNSController(dnsProvider)
if err != nil {
glog.Errorf("Error building DNS controller: %v", err)
os.Exit(1)
}
nodeController, err := watchers.NewNodeController(kubeClient, dnsController)
if err != nil {
glog.Errorf("Error building node controller: %v", err)
os.Exit(1)
}
podController, err := watchers.NewPodController(kubeClient, dnsController)
if err != nil {
glog.Errorf("Error building pod controller: %v", err)
os.Exit(1)
}
serviceController, err := watchers.NewServiceController(kubeClient, dnsController)
if err != nil {
glog.Errorf("Error building service controller: %v", err)
os.Exit(1)
}
ingressController, err := watchers.NewIngressController(kubeExtensionsClient, dnsController)
if err != nil {
glog.Errorf("Error building ingress controller: %v", err)
os.Exit(1)
}
go nodeController.Run()
go podController.Run()
go serviceController.Run()
go ingressController.Run()
dnsController.Run()
}

View File

@ -0,0 +1,20 @@
package dns
// Context represents a state of the world for DNS.
// It is grouped by scopes & named keys, and controllers will replace those groups
// The DNS controller will then merge all those record sets, resolve aliases etc,
// and then call into a dns backend to match the desired state of the world.
type Context interface {
// CreateScope creates a new scope, which holds a set of records.
// MarkReady must be called on every scope before any changes will be applied.
// Records from all the scopes will be merged
CreateScope(name string) (Scope, error)
}
type Scope interface {
// Replace sets the records for recordName to the provided set of records.
Replace(recordName string, records []Record)
// MarkReady should be called when a controller has populated all the records for a particular scope
MarkReady()
}

View File

@ -0,0 +1,466 @@
package dns
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kops/dns-controller/pkg/util"
"k8s.io/kops/upup/pkg/fi/utils"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
"sort"
"strings"
"sync"
"sync/atomic"
)
// DNSController applies the desired DNS state to the DNS backend
type DNSController struct {
util.Stoppable
// zones is the DNS provider
zones dnsprovider.Zones
// mutex protects the following mutable state
mutex sync.Mutex
// scopes is a map for each top-level grouping
scopes map[string]*DNSControllerScope
// lastSuccessSnapshot is the last snapshot we were able to apply to DNS
// This lets us perform incremental updates to DNS.
lastSuccessfulSnapshot *snapshot
// changeCount is a change-counter, which helps us avoid computation when nothing has changed
changeCount uint64
}
// DNSController is a Context
var _ Context = &DNSController{}
// scope is a group of record objects
type DNSControllerScope struct {
// ScopeName is the string id for this scope
ScopeName string
parent *DNSController
// mutex protected the following mutable state
mutex sync.Mutex
// Ready is set if the populating controller has performed an initial synchronization of records
Ready bool
// Records is the map of actual records for this scope
Records map[string][]Record
}
// DNSControllerScope is a Scope
var _ Scope = &DNSControllerScope{}
// NewDnsController creates a DnsController
func NewDNSController(provider dnsprovider.Interface) (*DNSController, error) {
if provider == nil {
return nil, fmt.Errorf("must pass provider")
}
c := &DNSController{
scopes: make(map[string]*DNSControllerScope),
}
zones, ok := provider.Zones()
if !ok {
return nil, fmt.Errorf("DNS provider does not support zones")
}
c.zones = zones
return c, nil
}
// Run starts the DnsController.
func (c *DNSController) Run() {
glog.Infof("starting DNS controller")
stopCh := c.StopChannel()
go c.runWatcher(stopCh)
<-stopCh
glog.Infof("shutting down DNS controller")
}
func (c *DNSController) runWatcher(stopCh <-chan struct{}) {
for {
err := c.runOnce()
if c.StopRequested() {
glog.Infof("exiting dns controller loop")
return
}
if err != nil {
glog.Warningf("Unexpected error in DNS controller, will retry: %v", err)
time.Sleep(10 * time.Second)
} else {
// Simple debouncing; DNS servers are typically pretty slow anyway
time.Sleep(5 * time.Second)
}
}
}
type snapshot struct {
changeCount uint64
records []Record
aliasTargets map[string][]Record
recordValues map[recordKey][]string
}
func (c *DNSController) snapshotIfChangedAndReady() *snapshot {
c.mutex.Lock()
defer c.mutex.Unlock()
s := &snapshot{
changeCount: atomic.LoadUint64(&c.changeCount),
}
aliasTargets := make(map[string][]Record)
if c.lastSuccessfulSnapshot != nil && s.changeCount == c.lastSuccessfulSnapshot.changeCount {
glog.V(4).Infof("No changes since DNS values last successfully applied")
return nil
}
recordCount := 0
for _, scope := range c.scopes {
if !scope.Ready {
glog.Infof("scope not yet ready: %s", scope.ScopeName)
return nil
}
for _, scopeRecords := range scope.Records {
recordCount += len(scopeRecords)
}
}
records := make([]Record, 0, recordCount)
for _, scope := range c.scopes {
for _, scopeRecords := range scope.Records {
for i := range scopeRecords {
r := &scopeRecords[i]
if r.AliasTarget {
aliasTargets[r.FQDN] = append(aliasTargets[r.FQDN], *r)
} else {
records = append(records, *r)
}
}
}
}
s.records = records
s.aliasTargets = aliasTargets
return s
}
type recordKey struct {
RecordType RecordType
FQDN string
}
func (c *DNSController) runOnce() error {
snapshot := c.snapshotIfChangedAndReady()
if snapshot == nil {
// Unchanged / not ready
return nil
}
newValueMap := make(map[recordKey][]string)
{
// Resolve and build map
for _, r := range snapshot.records {
if r.RecordType == RecordTypeAlias {
aliasRecords := snapshot.aliasTargets[r.Value]
if len(aliasRecords) == 0 {
glog.Infof("Alias in record specified %q, but no records were found for that name", r.Value)
}
for _, aliasRecord := range aliasRecords {
key := recordKey{
RecordType: aliasRecord.RecordType,
FQDN: r.FQDN,
}
// TODO: Support chains: alias of alias (etc)
newValueMap[key] = append(newValueMap[key], aliasRecord.Value)
}
continue
} else {
key := recordKey{
RecordType: r.RecordType,
FQDN: r.FQDN,
}
newValueMap[key] = append(newValueMap[key], r.Value)
continue
}
}
// Normalize
for k, values := range newValueMap {
sort.Strings(values)
newValueMap[k] = values
}
snapshot.recordValues = newValueMap
}
var oldValueMap map[recordKey][]string
if c.lastSuccessfulSnapshot != nil {
oldValueMap = c.lastSuccessfulSnapshot.recordValues
}
op, err := newDNSOp(c.zones)
if err != nil {
return err
}
// Store a list of all the errors, so that one bad apple doesn't block every other request
var errors []error
// Check each hostname for changes and apply them
for k, newValues := range newValueMap {
if c.StopRequested() {
return fmt.Errorf("stop requested")
}
oldValues := oldValueMap[k]
if utils.StringSlicesEqual(newValues, oldValues) {
glog.V(4).Infof("no change to records for %s", k)
continue
}
ttl := 60
glog.Infof("Using default TTL of %d seconds", ttl)
err := op.updateRecords(k, newValues, int64(ttl))
if err != nil {
glog.Infof("error updating records for %s: %v", k, err)
errors = append(errors, err)
}
}
// Look for deleted hostnames
for k := range oldValueMap {
if c.StopRequested() {
return fmt.Errorf("stop requested")
}
newValues := newValueMap[k]
if newValues == nil {
err := op.deleteRecords(k)
if err != nil {
glog.Infof("error deleting records for %s: %v", k, err)
errors = append(errors, err)
}
}
}
if len(errors) != 0 {
return errors[0]
}
// Success! Store the snapshot as our new baseline
c.mutex.Lock()
defer c.mutex.Unlock()
c.lastSuccessfulSnapshot = snapshot
return nil
}
type dnsOp struct {
zonesProvider dnsprovider.Zones
zones map[string]dnsprovider.Zone
}
func newDNSOp(zonesProvider dnsprovider.Zones) (*dnsOp, error) {
o := &dnsOp{
zonesProvider: zonesProvider,
}
zones, err := zonesProvider.List()
if err != nil {
return nil, fmt.Errorf("error querying for zones: %v", err)
}
zoneMap := make(map[string]dnsprovider.Zone)
for _, zone := range zones {
name := EnsureDotSuffix(zone.Name())
zoneMap[name] = zone
}
o.zones = zoneMap
return o, nil
}
func EnsureDotSuffix(s string) string {
if !strings.HasSuffix(s, ".") {
s = s + "."
}
return s
}
func (o *dnsOp) findZone(fqdn string) dnsprovider.Zone {
zoneName := EnsureDotSuffix(fqdn)
for {
zone := o.zones[zoneName]
if zone != nil {
return zone
}
dot := strings.IndexByte(zoneName, '.')
if dot == -1 {
return nil
}
zoneName = zoneName[dot + 1:]
}
}
func (o *dnsOp) deleteRecords(k recordKey) error {
glog.V(2).Infof("Deleting all records for %s", k)
zone := o.findZone(k.FQDN)
if zone == nil {
// TODO: Post event into service / pod
return fmt.Errorf("no suitable zone found for %q", k.FQDN)
}
rrsProvider, ok := zone.ResourceRecordSets()
if !ok {
return fmt.Errorf("zone does not support resource records %q", zone.Name())
}
rrs, err := rrsProvider.List()
if err != nil {
return fmt.Errorf("error querying resource records for zone %q: %v", zone.Name(), err)
}
cs := rrsProvider.StartChangeset()
empty := true
for _, rr := range rrs {
rrName := EnsureDotSuffix(rr.Name())
if rrName != k.FQDN {
glog.V(8).Infof("Skipping delete of record %q (name != %s)", rrName, k.FQDN)
continue
}
if string(rr.Type()) != string(k.RecordType) {
glog.V(8).Infof("Skipping delete of record %q (type %s != %s)", rrName, rr.Type(), k.RecordType)
continue
}
glog.V(2).Infof("Deleting resource record %s %s", rrName, rr.Type())
cs.Remove(rr)
empty = false
}
if empty {
return nil
}
if err := cs.Apply(); err != nil {
return fmt.Errorf("error deleting DNS resource records: %v", err)
}
return nil
}
func (o *dnsOp) updateRecords(k recordKey, newRecords []string, ttl int64) error {
glog.V(2).Infof("Updating records for %s: %v", k, newRecords)
zone := o.findZone(k.FQDN)
if zone == nil {
// TODO: Post event into service / pod
return fmt.Errorf("no suitable zone found for %q", k.FQDN)
}
rrsProvider, ok := zone.ResourceRecordSets()
if !ok {
return fmt.Errorf("zone does not support resource records %q", zone.Name())
}
rrs, err := rrsProvider.List()
if err != nil {
return fmt.Errorf("error querying resource records for zone %q: %v", zone.Name(), err)
}
var existing dnsprovider.ResourceRecordSet
for _, rr := range rrs {
if rr.Name() != k.FQDN {
glog.V(8).Infof("Skipping record %q (name != %s)", rr.Name(), k.FQDN)
continue
}
if string(rr.Type()) != string(k.RecordType) {
glog.V(8).Infof("Skipping record %q (type %s != %s)", rr.Name(), rr.Type(), k.RecordType)
continue
}
if existing != nil {
glog.Warningf("Found multiple matching records: %v and %v", existing, rr)
}
existing = rr
}
cs := rrsProvider.StartChangeset()
if existing != nil {
cs.Remove(existing)
}
glog.V(2).Infof("Updating resource record %s %s", k, newRecords)
rr := rrsProvider.New(k.FQDN, newRecords, ttl, rrstype.RrsType(k.RecordType))
cs.Add(rr)
if err := cs.Apply(); err != nil {
return fmt.Errorf("error updating resource record %s %s: %v", k.FQDN, rr.Type(), err)
}
return nil
}
func (c *DNSController) recordChange() {
atomic.AddUint64(&c.changeCount, 1)
}
func (s *DNSControllerScope) MarkReady() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.Ready = true
}
func (s *DNSControllerScope) Replace(recordName string, records []Record) {
glog.V(2).Infof("Update %s/%s: %v", s.ScopeName, recordName, records)
s.mutex.Lock()
defer s.mutex.Unlock()
if len(records) == 0 {
delete(s.Records, recordName)
} else {
s.Records[recordName] = records
}
s.parent.recordChange()
}
// CreateScope creates a scope object.
func (c *DNSController) CreateScope(scopeName string) (Scope, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
s := c.scopes[scopeName]
if s != nil {
// We can't support this then we would need to change Ready to a counter
// (OK, so we could, but it's probably an error anyway)
return nil, fmt.Errorf("duplicate scope: %q", scopeName)
}
s = &DNSControllerScope{
ScopeName: scopeName,
Records: make(map[string][]Record),
parent: c,
Ready: false,
}
c.scopes[scopeName] = s
return s, nil
}

View File

@ -0,0 +1,22 @@
package dns
type RecordType string
const (
// RecordTypeAlias is unusual: the controller will try to resolve the target locally
RecordTypeAlias = "_alias"
RecordTypeA = "A"
RecordTypeCNAME = "CNAME"
)
type Record struct {
RecordType RecordType
FQDN string
Value string
// If AliasTarget is set, this entry will not actually be set in DNS,
// but will be used as an expansion for Records with type=RecordTypeAlias,
// where the referring record has Value = our FQDN
AliasTarget bool
}

View File

@ -0,0 +1,54 @@
package util
import (
"fmt"
"sync"
"github.com/golang/glog"
)
// Stoppable implements the standard stop / shutdown logic
type Stoppable struct {
// mutex is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
// We also use it for lazy-init
mutex sync.Mutex
shutdown bool
stopChannel chan struct{}
}
// StopChannel gets the stopChannel, initializing it if needed
func (s *Stoppable) StopChannel() <-chan struct{} {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.stopChannel == nil {
s.stopChannel = make(chan struct{})
}
return s.stopChannel
}
// Stop stops the controller.
func (s *Stoppable) Stop() error {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.shutdown {
return fmt.Errorf("shutdown already in progress")
}
// We initialize the channel to avoid a race if we Stop before anyone is watching
if s.stopChannel == nil {
s.stopChannel = make(chan struct{})
}
close(s.stopChannel)
glog.Infof("shutting down controller")
s.shutdown = true
return nil
}
func (s *Stoppable) StopRequested() bool {
return s.shutdown
}

View File

@ -0,0 +1,9 @@
package watchers
// AnnotationNameDnsExternal is used to set up a DNS name for accessing the resource from outside the cluster
// For a service of Type=LoadBalancer, it would map to the external LB hostname or IP
const AnnotationNameDnsExternal = "dns.alpha.kubernetes.io/external"
// AnnotationNameDnsInternal is used to set up a DNS name for accessing the resource from inside the cluster
// This is only supported on Pods currently, and maps to the Internal address
const AnnotationNameDnsInternal = "dns.alpha.kubernetes.io/internal"

View File

@ -0,0 +1,158 @@
package watchers
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
client_extensions "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3/typed/extensions/v1beta1"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kops/dns-controller/pkg/dns"
"k8s.io/kops/dns-controller/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)
// IngressController watches for Ingress objects with dns labels
type IngressController struct {
util.Stoppable
kubeClient *client_extensions.ExtensionsClient
scope dns.Scope
}
// newIngressController creates a ingressController
func NewIngressController(kubeClient *client_extensions.ExtensionsClient, dns dns.Context) (*IngressController, error) {
scope, err := dns.CreateScope("ingress")
if err != nil {
return nil, fmt.Errorf("error building dns scope: %v", err)
}
c := &IngressController{
kubeClient: kubeClient,
scope: scope,
}
return c, nil
}
// Run starts the IngressController.
func (c *IngressController) Run() {
glog.Infof("starting ingress controller")
stopCh := c.StopChannel()
go c.runWatcher(stopCh)
<-stopCh
glog.Infof("shutting down ingress controller")
}
func (c *IngressController) runWatcher(stopCh <-chan struct{}) {
runOnce := func() (bool, error) {
var listOpts api.ListOptions
glog.Warningf("querying without label filter")
listOpts.LabelSelector = labels.Everything()
glog.Warningf("querying without field filter")
listOpts.FieldSelector = fields.Everything()
ingressList, err := c.kubeClient.Ingresses("").List(listOpts)
if err != nil {
return false, fmt.Errorf("error listing ingresss: %v", err)
}
for i := range ingressList.Items {
ingress := &ingressList.Items[i]
glog.V(4).Infof("found ingress: %v", ingress.Name)
c.updateIngressRecords(ingress)
}
c.scope.MarkReady()
glog.Warningf("querying without label filter")
listOpts.LabelSelector = labels.Everything()
glog.Warningf("querying without field filter")
listOpts.FieldSelector = fields.Everything()
listOpts.Watch = true
listOpts.ResourceVersion = ingressList.ResourceVersion
watcher, err := c.kubeClient.Ingresses("").Watch(listOpts)
if err != nil {
return false, fmt.Errorf("error watching ingresss: %v", err)
}
ch := watcher.ResultChan()
for {
select {
case <-stopCh:
glog.Infof("Got stop signal")
return true, nil
case event, ok := <-ch:
if !ok {
glog.Infof("ingress watch channel closed")
return false, nil
}
ingress := event.Object.(*v1beta1.Ingress)
glog.V(4).Infof("ingress changed: %s %v", event.Type, ingress.Name)
switch event.Type {
case watch.Added, watch.Modified:
c.updateIngressRecords(ingress)
case watch.Deleted:
c.scope.Replace(ingress.Name, nil)
default:
glog.Warningf("Unknown event type: %v", event.Type)
}
}
}
}
for {
stop, err := runOnce()
if stop {
return
}
if err != nil {
glog.Warningf("Unexpected error in event watch, will retry: %v", err)
time.Sleep(10 * time.Second)
}
}
}
func (c *IngressController) updateIngressRecords(ingress *v1beta1.Ingress) {
var records []dns.Record
var ingresses []dns.Record
for i := range ingress.Status.LoadBalancer.Ingress {
ingress := &ingress.Status.LoadBalancer.Ingress[i]
if ingress.Hostname != "" {
// TODO: Support ELB aliases
ingresses = append(ingresses, dns.Record{
RecordType: dns.RecordTypeCNAME,
Value: ingress.Hostname,
})
}
if ingress.IP != "" {
ingresses = append(ingresses, dns.Record{
RecordType: dns.RecordTypeA,
Value: ingress.IP,
})
}
}
for _, rule := range ingress.Spec.Rules {
if rule.Host == "" {
continue
}
fqdn := dns.EnsureDotSuffix(rule.Host)
for _, ingress := range ingresses {
var r dns.Record
r = ingress
r.FQDN = fqdn
records = append(records, r)
}
}
c.scope.Replace(ingress.Name, records)
}

View File

@ -0,0 +1,203 @@
package watchers
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kops/dns-controller/pkg/dns"
"k8s.io/kops/dns-controller/pkg/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3/typed/core/v1"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/watch"
)
// NodeController watches for nodes
type NodeController struct {
util.Stoppable
kubeClient *client.CoreClient
scope dns.Scope
}
// newNodeController creates a nodeController
func NewNodeController(kubeClient *client.CoreClient, dns dns.Context) (*NodeController, error) {
scope, err := dns.CreateScope("node")
if err != nil {
return nil, fmt.Errorf("error building dns scope: %v", err)
}
c := &NodeController{
kubeClient: kubeClient,
scope: scope,
}
return c, nil
}
// Run starts the NodeController.
func (c *NodeController) Run() {
glog.Infof("starting node controller")
stopCh := c.StopChannel()
go c.runWatcher(stopCh)
<-stopCh
glog.Infof("shutting down node controller")
}
func (c *NodeController) runWatcher(stopCh <-chan struct{}) {
runOnce := func() (bool, error) {
var listOpts api.ListOptions
// Note we need to watch all the nodes, to set up alias targets
listOpts.LabelSelector = labels.Everything()
glog.Warningf("querying without field filter")
listOpts.FieldSelector = fields.Everything()
nodeList, err := c.kubeClient.Nodes().List(listOpts)
if err != nil {
return false, fmt.Errorf("error listing nodes: %v", err)
}
for i := range nodeList.Items {
node := &nodeList.Items[i]
glog.Infof("node: %v", node.Name)
c.updateNodeRecords(node)
}
c.scope.MarkReady()
// Note we need to watch all the nodes, to set up alias targets
listOpts.LabelSelector = labels.Everything()
glog.Warningf("querying without field filter")
listOpts.FieldSelector = fields.Everything()
listOpts.Watch = true
listOpts.ResourceVersion = nodeList.ResourceVersion
watcher, err := c.kubeClient.Nodes().Watch(listOpts)
if err != nil {
return false, fmt.Errorf("error watching nodes: %v", err)
}
ch := watcher.ResultChan()
for {
select {
case <-stopCh:
glog.Infof("Got stop signal")
return true, nil
case event, ok := <-ch:
if !ok {
glog.Infof("node watch channel closed")
return false, nil
}
node := event.Object.(*v1.Node)
glog.V(4).Infof("node changed: %s %v", event.Type, node.Name)
switch event.Type {
case watch.Added, watch.Modified:
c.updateNodeRecords(node)
case watch.Deleted:
c.scope.Replace(node.Name, nil)
}
}
}
}
for {
stop, err := runOnce()
if stop {
return
}
if err != nil {
glog.Warningf("Unexpected error in event watch, will retry: %v", err)
time.Sleep(10 * time.Second)
}
}
}
func (c *NodeController) updateNodeRecords(node *v1.Node) {
var records []dns.Record
//dnsLabel := node.Labels[LabelNameDns]
//if dnsLabel != "" {
// var ips []string
// for _, a := range node.Status.Addresses {
// if a.Type != v1.NodeExternalIP {
// continue
// }
// ips = append(ips, a.Address)
// }
// tokens := strings.Split(dnsLabel, ",")
// for _, token := range tokens {
// token = strings.TrimSpace(token)
//
// // Assume a FQDN A record
// fqdn := token
// for _, ip := range ips {
// records = append(records, dns.Record{
// RecordType: dns.RecordTypeA,
// FQDN: fqdn,
// Value: ip,
// })
// }
// }
//}
//
//dnsLabelInternal := node.Annotations[AnnotationNameDnsInternal]
//if dnsLabelInternal != "" {
// var ips []string
// for _, a := range node.Status.Addresses {
// if a.Type != v1.NodeInternalIP {
// continue
// }
// ips = append(ips, a.Address)
// }
// tokens := strings.Split(dnsLabelInternal, ",")
// for _, token := range tokens {
// token = strings.TrimSpace(token)
//
// // Assume a FQDN A record
// fqdn := dns.EnsureDotSuffix(token)
// for _, ip := range ips {
// records = append(records, dns.Record{
// RecordType: dns.RecordTypeA,
// FQDN: fqdn,
// Value: ip,
// })
// }
// }
//}
// Alias targets
// node/<name>/internal -> InternalIP
for _, a := range node.Status.Addresses {
if a.Type != v1.NodeInternalIP {
continue
}
records = append(records, dns.Record{
RecordType: dns.RecordTypeA,
FQDN: "node/" + node.Name + "/internal",
Value: a.Address,
AliasTarget: true,
})
}
// node/<name>/external -> ExternalIP
for _, a := range node.Status.Addresses {
if a.Type != v1.NodeExternalIP {
continue
}
records = append(records, dns.Record{
RecordType: dns.RecordTypeA,
FQDN: "node/" + node.Name + "/external",
Value: a.Address,
AliasTarget: true,
})
}
c.scope.Replace(node.Name, records)
}

View File

@ -0,0 +1,182 @@
package watchers
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kops/dns-controller/pkg/dns"
"k8s.io/kops/dns-controller/pkg/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3/typed/core/v1"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/watch"
"strings"
)
// PodController watches for Pods with dns annotations
type PodController struct {
util.Stoppable
kubeClient *client.CoreClient
scope dns.Scope
}
// newPodController creates a podController
func NewPodController(kubeClient *client.CoreClient, dns dns.Context) (*PodController, error) {
scope, err := dns.CreateScope("pod")
if err != nil {
return nil, fmt.Errorf("error building dns scope: %v", err)
}
c := &PodController{
kubeClient: kubeClient,
scope: scope,
}
return c, nil
}
// Run starts the PodController.
func (c *PodController) Run() {
glog.Infof("starting pod controller")
stopCh := c.StopChannel()
go c.runWatcher(stopCh)
<-stopCh
glog.Infof("shutting down pod controller")
}
func (c *PodController) runWatcher(stopCh <-chan struct{}) {
runOnce := func() (bool, error) {
var listOpts api.ListOptions
glog.Warningf("querying without label filter")
listOpts.LabelSelector = labels.Everything()
glog.Warningf("querying without field filter")
listOpts.FieldSelector = fields.Everything()
podList, err := c.kubeClient.Pods("").List(listOpts)
if err != nil {
return false, fmt.Errorf("error listing pods: %v", err)
}
for i := range podList.Items {
pod := &podList.Items[i]
glog.V(4).Infof("found pod: %v", pod.Name)
c.updatePodRecords(pod)
}
c.scope.MarkReady()
glog.Warningf("querying without label filter")
listOpts.LabelSelector = labels.Everything()
glog.Warningf("querying without field filter")
listOpts.FieldSelector = fields.Everything()
listOpts.Watch = true
listOpts.ResourceVersion = podList.ResourceVersion
watcher, err := c.kubeClient.Pods("").Watch(listOpts)
if err != nil {
return false, fmt.Errorf("error watching pods: %v", err)
}
ch := watcher.ResultChan()
for {
select {
case <-stopCh:
glog.Infof("Got stop signal")
return true, nil
case event, ok := <-ch:
if !ok {
glog.Infof("pod watch channel closed")
return false, nil
}
pod := event.Object.(*v1.Pod)
glog.V(4).Infof("pod changed: %s %v", event.Type, pod.Name)
switch event.Type {
case watch.Added, watch.Modified:
c.updatePodRecords(pod)
case watch.Deleted:
c.scope.Replace(pod.Name, nil)
default:
glog.Warningf("Unknown event type: %v", event.Type)
}
}
}
}
for {
stop, err := runOnce()
if stop {
return
}
if err != nil {
glog.Warningf("Unexpected error in event watch, will retry: %v", err)
time.Sleep(10 * time.Second)
}
}
}
func (c *PodController) updatePodRecords(pod *v1.Pod) {
var records []dns.Record
specExternal := pod.Annotations[AnnotationNameDnsExternal]
if specExternal != "" {
var aliases []string
if pod.Spec.HostNetwork {
if pod.Spec.NodeName != "" {
aliases = append(aliases, "node/" + pod.Spec.NodeName + "/external")
}
} else {
glog.V(4).Infof("Pod %q had %s=%s, but was not HostNetwork", pod.Name, AnnotationNameDnsExternal, specExternal)
}
tokens := strings.Split(specExternal, ",")
for _, token := range tokens {
token = strings.TrimSpace(token)
fqdn := dns.EnsureDotSuffix(token)
for _, alias := range aliases {
records = append(records, dns.Record{
RecordType: dns.RecordTypeAlias,
FQDN: fqdn,
Value: alias,
})
}
}
} else {
glog.V(4).Infof("Pod %q did not have %s annotation", pod.Name, AnnotationNameDnsExternal)
}
specInternal := pod.Annotations[AnnotationNameDnsInternal]
if specInternal != "" {
var ips []string
if pod.Spec.HostNetwork {
if pod.Status.PodIP != "" {
ips = append(ips, pod.Status.PodIP)
}
} else {
glog.V(4).Infof("Pod %q had %s=%s, but was not HostNetwork", pod.Name, AnnotationNameDnsInternal, specInternal)
}
tokens := strings.Split(specInternal, ",")
for _, token := range tokens {
token = strings.TrimSpace(token)
fqdn := dns.EnsureDotSuffix(token)
for _, ip := range ips {
records = append(records, dns.Record{
RecordType: dns.RecordTypeA,
FQDN: fqdn,
Value: ip,
})
}
}
} else {
glog.V(4).Infof("Pod %q did not have %s label", pod.Name, AnnotationNameDnsInternal)
}
c.scope.Replace(pod.Name, records)
}

View File

@ -0,0 +1,162 @@
package watchers
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kops/dns-controller/pkg/dns"
"k8s.io/kops/dns-controller/pkg/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
client "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3/typed/core/v1"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/watch"
"strings"
)
// ServiceController watches for services with dns annotations
type ServiceController struct {
util.Stoppable
kubeClient *client.CoreClient
scope dns.Scope
}
// newServiceController creates a serviceController
func NewServiceController(kubeClient *client.CoreClient, dns dns.Context) (*ServiceController, error) {
scope, err := dns.CreateScope("service")
if err != nil {
return nil, fmt.Errorf("error building dns scope: %v", err)
}
c := &ServiceController{
kubeClient: kubeClient,
scope: scope,
}
return c, nil
}
// Run starts the ServiceController.
func (c *ServiceController) Run() {
glog.Infof("starting service controller")
stopCh := c.StopChannel()
go c.runWatcher(stopCh)
<-stopCh
glog.Infof("shutting down service controller")
}
func (c *ServiceController) runWatcher(stopCh <-chan struct{}) {
runOnce := func() (bool, error) {
var listOpts api.ListOptions
glog.Warningf("querying without label filter")
listOpts.LabelSelector = labels.Everything()
glog.Warningf("querying without field filter")
listOpts.FieldSelector = fields.Everything()
serviceList, err := c.kubeClient.Services("").List(listOpts)
if err != nil {
return false, fmt.Errorf("error listing services: %v", err)
}
for i := range serviceList.Items {
service := &serviceList.Items[i]
glog.V(4).Infof("found service: %v", service.Name)
c.updateServiceRecords(service)
}
c.scope.MarkReady()
glog.Warningf("querying without label filter")
listOpts.LabelSelector = labels.Everything()
glog.Warningf("querying without field filter")
listOpts.FieldSelector = fields.Everything()
listOpts.Watch = true
listOpts.ResourceVersion = serviceList.ResourceVersion
watcher, err := c.kubeClient.Services("").Watch(listOpts)
if err != nil {
return false, fmt.Errorf("error watching services: %v", err)
}
ch := watcher.ResultChan()
for {
select {
case <-stopCh:
glog.Infof("Got stop signal")
return true, nil
case event, ok := <-ch:
if !ok {
glog.Infof("service watch channel closed")
return false, nil
}
service := event.Object.(*v1.Service)
glog.V(4).Infof("service changed: %s %v", event.Type, service.Name)
switch event.Type {
case watch.Added, watch.Modified:
c.updateServiceRecords(service)
case watch.Deleted:
c.scope.Replace(service.Name, nil)
default:
glog.Warningf("Unknown event type: %v", event.Type)
}
}
}
}
for {
stop, err := runOnce()
if stop {
return
}
if err != nil {
glog.Warningf("Unexpected error in event watch, will retry: %v", err)
time.Sleep(10 * time.Second)
}
}
}
func (c *ServiceController) updateServiceRecords(service *v1.Service) {
var records []dns.Record
specExternal := service.Annotations[AnnotationNameDnsExternal]
if specExternal != "" {
var ingresses []dns.Record
for i := range service.Status.LoadBalancer.Ingress {
ingress := &service.Status.LoadBalancer.Ingress[i]
if ingress.Hostname != "" {
// TODO: Support ELB aliases
ingresses = append(ingresses, dns.Record{
RecordType: dns.RecordTypeCNAME,
Value: ingress.Hostname,
})
}
if ingress.IP != "" {
ingresses = append(ingresses, dns.Record{
RecordType: dns.RecordTypeA,
Value: ingress.IP,
})
}
}
tokens := strings.Split(specExternal, ",")
for _, token := range tokens {
token = strings.TrimSpace(token)
fqdn := dns.EnsureDotSuffix(token)
for _, ingress := range ingresses {
var r dns.Record
r = ingress
r.FQDN = fqdn
records = append(records, r)
}
}
} else {
glog.V(4).Infof("Service %q did not have %s annotation", service.Name, AnnotationNameDnsInternal)
}
c.scope.Replace( service.Name, records)
}

View File

@ -0,0 +1,12 @@
FROM debian:jessie
# Install packages:
# curl (to download golang)
# gcc & make (for building)
RUN apt-get update && apt-get install --yes curl gcc make
# Install golang
RUN curl -L https://storage.googleapis.com/golang/go1.6.3.linux-amd64.tar.gz | tar zx -C /usr/local
ENV PATH $PATH:/usr/local/go/bin
COPY images/dns-controller-builder/onbuild.sh /onbuild.sh

View File

@ -0,0 +1,13 @@
#!/bin/bash
mkdir -p /go
export GOPATH=/go
mkdir -p /go/src/k8s.io
ln -s /src/ /go/src/k8s.io/kops
cd /go/src/k8s.io/kops/
make dns-controller-gocode
mkdir -p /src/.build/artifacts/
cp /go/bin/dns-controller /src/.build/artifacts/

View File

@ -0,0 +1,9 @@
FROM debian:jessie
# ca-certificates: Needed to talk to EC2 API
RUN apt-get update && apt-get install --yes ca-certificates
COPY /.build/artifacts/dns-controller /usr/bin/dns-controller
CMD /usr/bin/dns-controller