kops/dns-controller/pkg/dns/dnscontroller.go

494 lines
11 KiB
Go

package dns
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kops/dns-controller/pkg/util"
"k8s.io/kops/upup/pkg/fi/utils"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
"sort"
"strings"
"sync"
"sync/atomic"
)
// DNSController applies the desired DNS state to the DNS backend
type DNSController struct {
zoneRules *ZoneRules
util.Stoppable
// zones is the DNS provider
zones dnsprovider.Zones
// mutex protects the following mutable state
mutex sync.Mutex
// scopes is a map for each top-level grouping
scopes map[string]*DNSControllerScope
// lastSuccessSnapshot is the last snapshot we were able to apply to DNS
// This lets us perform incremental updates to DNS.
lastSuccessfulSnapshot *snapshot
// changeCount is a change-counter, which helps us avoid computation when nothing has changed
changeCount uint64
}
// DNSController is a Context
var _ Context = &DNSController{}
// scope is a group of record objects
type DNSControllerScope struct {
// ScopeName is the string id for this scope
ScopeName string
parent *DNSController
// mutex protected the following mutable state
mutex sync.Mutex
// Ready is set if the populating controller has performed an initial synchronization of records
Ready bool
// Records is the map of actual records for this scope
Records map[string][]Record
}
// DNSControllerScope is a Scope
var _ Scope = &DNSControllerScope{}
// NewDnsController creates a DnsController
func NewDNSController(provider dnsprovider.Interface, zoneRules *ZoneRules) (*DNSController, error) {
if provider == nil {
return nil, fmt.Errorf("must pass provider")
}
c := &DNSController{
scopes: make(map[string]*DNSControllerScope),
zoneRules: zoneRules,
}
zones, ok := provider.Zones()
if !ok {
return nil, fmt.Errorf("DNS provider does not support zones")
}
c.zones = zones
return c, nil
}
// Run starts the DnsController.
func (c *DNSController) Run() {
glog.Infof("starting DNS controller")
stopCh := c.StopChannel()
go c.runWatcher(stopCh)
<-stopCh
glog.Infof("shutting down DNS controller")
}
func (c *DNSController) runWatcher(stopCh <-chan struct{}) {
for {
err := c.runOnce()
if c.StopRequested() {
glog.Infof("exiting dns controller loop")
return
}
if err != nil {
glog.Warningf("Unexpected error in DNS controller, will retry: %v", err)
time.Sleep(10 * time.Second)
} else {
// Simple debouncing; DNS servers are typically pretty slow anyway
time.Sleep(5 * time.Second)
}
}
}
type snapshot struct {
changeCount uint64
records []Record
aliasTargets map[string][]Record
recordValues map[recordKey][]string
}
func (c *DNSController) snapshotIfChangedAndReady() *snapshot {
c.mutex.Lock()
defer c.mutex.Unlock()
s := &snapshot{
changeCount: atomic.LoadUint64(&c.changeCount),
}
aliasTargets := make(map[string][]Record)
if c.lastSuccessfulSnapshot != nil && s.changeCount == c.lastSuccessfulSnapshot.changeCount {
glog.V(4).Infof("No changes since DNS values last successfully applied")
return nil
}
recordCount := 0
for _, scope := range c.scopes {
if !scope.Ready {
glog.Infof("scope not yet ready: %s", scope.ScopeName)
return nil
}
for _, scopeRecords := range scope.Records {
recordCount += len(scopeRecords)
}
}
records := make([]Record, 0, recordCount)
for _, scope := range c.scopes {
for _, scopeRecords := range scope.Records {
for i := range scopeRecords {
r := &scopeRecords[i]
if r.AliasTarget {
aliasTargets[r.FQDN] = append(aliasTargets[r.FQDN], *r)
} else {
records = append(records, *r)
}
}
}
}
s.records = records
s.aliasTargets = aliasTargets
return s
}
type recordKey struct {
RecordType RecordType
FQDN string
}
func (c *DNSController) runOnce() error {
snapshot := c.snapshotIfChangedAndReady()
if snapshot == nil {
// Unchanged / not ready
return nil
}
newValueMap := make(map[recordKey][]string)
{
// Resolve and build map
for _, r := range snapshot.records {
if r.RecordType == RecordTypeAlias {
aliasRecords := snapshot.aliasTargets[r.Value]
if len(aliasRecords) == 0 {
glog.Infof("Alias in record specified %q, but no records were found for that name", r.Value)
}
for _, aliasRecord := range aliasRecords {
key := recordKey{
RecordType: aliasRecord.RecordType,
FQDN: r.FQDN,
}
// TODO: Support chains: alias of alias (etc)
newValueMap[key] = append(newValueMap[key], aliasRecord.Value)
}
continue
} else {
key := recordKey{
RecordType: r.RecordType,
FQDN: r.FQDN,
}
newValueMap[key] = append(newValueMap[key], r.Value)
continue
}
}
// Normalize
for k, values := range newValueMap {
sort.Strings(values)
newValueMap[k] = values
}
snapshot.recordValues = newValueMap
}
var oldValueMap map[recordKey][]string
if c.lastSuccessfulSnapshot != nil {
oldValueMap = c.lastSuccessfulSnapshot.recordValues
}
op, err := newDNSOp(c.zoneRules, c.zones)
if err != nil {
return err
}
// Store a list of all the errors, so that one bad apple doesn't block every other request
var errors []error
// Check each hostname for changes and apply them
for k, newValues := range newValueMap {
if c.StopRequested() {
return fmt.Errorf("stop requested")
}
oldValues := oldValueMap[k]
if utils.StringSlicesEqual(newValues, oldValues) {
glog.V(4).Infof("no change to records for %s", k)
continue
}
ttl := 60
glog.Infof("Using default TTL of %d seconds", ttl)
err := op.updateRecords(k, newValues, int64(ttl))
if err != nil {
glog.Infof("error updating records for %s: %v", k, err)
errors = append(errors, err)
}
}
// Look for deleted hostnames
for k := range oldValueMap {
if c.StopRequested() {
return fmt.Errorf("stop requested")
}
newValues := newValueMap[k]
if newValues == nil {
err := op.deleteRecords(k)
if err != nil {
glog.Infof("error deleting records for %s: %v", k, err)
errors = append(errors, err)
}
}
}
if len(errors) != 0 {
return errors[0]
}
// Success! Store the snapshot as our new baseline
c.mutex.Lock()
defer c.mutex.Unlock()
c.lastSuccessfulSnapshot = snapshot
return nil
}
type dnsOp struct {
zonesProvider dnsprovider.Zones
zones map[string]dnsprovider.Zone
}
func newDNSOp(zoneRules *ZoneRules, zonesProvider dnsprovider.Zones) (*dnsOp, error) {
o := &dnsOp{
zonesProvider: zonesProvider,
}
zones, err := zonesProvider.List()
if err != nil {
return nil, fmt.Errorf("error querying for zones: %v", err)
}
// First we build up a map of all zones by name,
// then we go through and pick the "correct" zone for each name
allZoneMap := make(map[string][]dnsprovider.Zone)
for _, zone := range zones {
name := EnsureDotSuffix(zone.Name())
allZoneMap[name] = append(allZoneMap[name], zone)
}
zoneMap := make(map[string]dnsprovider.Zone)
for name, zones := range allZoneMap {
var matches []dnsprovider.Zone
for _, zone := range zones {
if zoneRules.MatchesExplicitly(zone) {
matches = append(matches, zone)
}
}
if len(matches) == 0 && zoneRules.Wildcard {
// No explicit matches but wildcard; treat everything as matching
matches = append(matches, zones...)
}
if len(matches) == 1 {
zoneMap[name] = matches[0]
} else if len(matches) > 1 {
glog.Warningf("Found multiple zones for name %q, won't manage zone (To fix: provide zone mapping flag with ID of zone)", name)
}
}
o.zones = zoneMap
return o, nil
}
func EnsureDotSuffix(s string) string {
if !strings.HasSuffix(s, ".") {
s = s + "."
}
return s
}
func (o *dnsOp) findZone(fqdn string) dnsprovider.Zone {
zoneName := EnsureDotSuffix(fqdn)
for {
zone := o.zones[zoneName]
if zone != nil {
return zone
}
dot := strings.IndexByte(zoneName, '.')
if dot == -1 {
return nil
}
zoneName = zoneName[dot+1:]
}
}
func (o *dnsOp) deleteRecords(k recordKey) error {
glog.V(2).Infof("Deleting all records for %s", k)
zone := o.findZone(k.FQDN)
if zone == nil {
// TODO: Post event into service / pod
return fmt.Errorf("no suitable zone found for %q", k.FQDN)
}
rrsProvider, ok := zone.ResourceRecordSets()
if !ok {
return fmt.Errorf("zone does not support resource records %q", zone.Name())
}
rrs, err := rrsProvider.List()
if err != nil {
return fmt.Errorf("error querying resource records for zone %q: %v", zone.Name(), err)
}
cs := rrsProvider.StartChangeset()
empty := true
for _, rr := range rrs {
rrName := EnsureDotSuffix(rr.Name())
if rrName != k.FQDN {
glog.V(8).Infof("Skipping delete of record %q (name != %s)", rrName, k.FQDN)
continue
}
if string(rr.Type()) != string(k.RecordType) {
glog.V(8).Infof("Skipping delete of record %q (type %s != %s)", rrName, rr.Type(), k.RecordType)
continue
}
glog.V(2).Infof("Deleting resource record %s %s", rrName, rr.Type())
cs.Remove(rr)
empty = false
}
if empty {
return nil
}
if err := cs.Apply(); err != nil {
return fmt.Errorf("error deleting DNS resource records: %v", err)
}
return nil
}
func (o *dnsOp) updateRecords(k recordKey, newRecords []string, ttl int64) error {
glog.V(2).Infof("Updating records for %s: %v", k, newRecords)
zone := o.findZone(k.FQDN)
if zone == nil {
// TODO: Post event into service / pod
return fmt.Errorf("no suitable zone found for %q", k.FQDN)
}
rrsProvider, ok := zone.ResourceRecordSets()
if !ok {
return fmt.Errorf("zone does not support resource records %q", zone.Name())
}
rrs, err := rrsProvider.List()
if err != nil {
return fmt.Errorf("error querying resource records for zone %q: %v", zone.Name(), err)
}
var existing dnsprovider.ResourceRecordSet
for _, rr := range rrs {
if rr.Name() != k.FQDN {
glog.V(8).Infof("Skipping record %q (name != %s)", rr.Name(), k.FQDN)
continue
}
if string(rr.Type()) != string(k.RecordType) {
glog.V(8).Infof("Skipping record %q (type %s != %s)", rr.Name(), rr.Type(), k.RecordType)
continue
}
if existing != nil {
glog.Warningf("Found multiple matching records: %v and %v", existing, rr)
}
existing = rr
}
cs := rrsProvider.StartChangeset()
if existing != nil {
cs.Remove(existing)
}
glog.V(2).Infof("Updating resource record %s %s", k, newRecords)
rr := rrsProvider.New(k.FQDN, newRecords, ttl, rrstype.RrsType(k.RecordType))
cs.Add(rr)
if err := cs.Apply(); err != nil {
return fmt.Errorf("error updating resource record %s %s: %v", k.FQDN, rr.Type(), err)
}
return nil
}
func (c *DNSController) recordChange() {
atomic.AddUint64(&c.changeCount, 1)
}
func (s *DNSControllerScope) MarkReady() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.Ready = true
}
func (s *DNSControllerScope) Replace(recordName string, records []Record) {
glog.V(2).Infof("Update %s/%s: %v", s.ScopeName, recordName, records)
s.mutex.Lock()
defer s.mutex.Unlock()
if len(records) == 0 {
delete(s.Records, recordName)
} else {
s.Records[recordName] = records
}
s.parent.recordChange()
}
// CreateScope creates a scope object.
func (c *DNSController) CreateScope(scopeName string) (Scope, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
s := c.scopes[scopeName]
if s != nil {
// We can't support this then we would need to change Ready to a counter
// (OK, so we could, but it's probably an error anyway)
return nil, fmt.Errorf("duplicate scope: %q", scopeName)
}
s = &DNSControllerScope{
ScopeName: scopeName,
Records: make(map[string][]Record),
parent: c,
Ready: false,
}
c.scopes[scopeName] = s
return s, nil
}