dns-controller optimizations

* Cache list of zones
* Combine DNS changesets
* If we have to list records for a zone, cache them in the dns operation
This commit is contained in:
Justin Santa Barbara 2017-01-15 22:02:18 -05:00
parent 2d608070ba
commit 4504028b9b
4 changed files with 182 additions and 48 deletions

View File

@ -0,0 +1,83 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dns
import (
"fmt"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
)
// dnsCache is a wrapper around the DNS provider, adding some caching
type dnsCache struct {
// zones is the DNS provider
zonesProvider dnsprovider.Zones
// mutex protects the following mutable state
mutex sync.Mutex
cachedZones []dnsprovider.Zone
cachedZonesTimestamp int64
}
func newDNSCache(provider dnsprovider.Interface) (*dnsCache, error) {
zonesProvider, ok := provider.Zones()
if !ok {
return nil, fmt.Errorf("DNS provider does not support zones")
}
return &dnsCache{
zonesProvider: zonesProvider,
}, nil
}
// nanoTime is a stand-in until we get a monotonic clock
func nanoTime() int64 {
return time.Now().UnixNano()
}
// ListZones returns the zones, using a cached copy if validity has not yet expired.
// This is not a cheap call with a large number of hosted zones, hence the caching.
func (d *dnsCache) ListZones(validity time.Duration) ([]dnsprovider.Zone, error) {
d.mutex.Lock()
defer d.mutex.Unlock()
now := nanoTime()
if d.cachedZones != nil {
if (d.cachedZonesTimestamp + validity.Nanoseconds()) > now {
return d.cachedZones, nil
} else {
glog.V(2).Infof("querying all DNS zones (cache expired)")
}
} else {
glog.V(2).Infof("querying all DNS zones (no cached results)")
}
zones, err := d.zonesProvider.List()
if err != nil {
return nil, fmt.Errorf("error querying for DNS zones: %v", err)
}
d.cachedZones = zones
d.cachedZonesTimestamp = now
return zones, nil
}

View File

@ -32,6 +32,8 @@ import (
"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
)
var zoneListCacheValidity = time.Minute * 15
const DefaultTTL = time.Minute
// DNSController applies the desired DNS state to the DNS backend
@ -40,8 +42,7 @@ type DNSController struct {
util.Stoppable
// zones is the DNS provider
zones dnsprovider.Zones
dnsCache *dnsCache
// mutex protects the following mutable state
mutex sync.Mutex
@ -79,22 +80,18 @@ type DNSControllerScope struct {
var _ Scope = &DNSControllerScope{}
// NewDnsController creates a DnsController
func NewDNSController(provider dnsprovider.Interface, zoneRules *ZoneRules) (*DNSController, error) {
if provider == nil {
return nil, fmt.Errorf("must pass provider")
func NewDNSController(dnsProvider dnsprovider.Interface, zoneRules *ZoneRules) (*DNSController, error) {
dnsCache, err := newDNSCache(dnsProvider)
if err != nil {
return nil, fmt.Errorf("error initializing DNS cache: %v", err)
}
c := &DNSController{
scopes: make(map[string]*DNSControllerScope),
zoneRules: zoneRules,
dnsCache: dnsCache,
}
zones, ok := provider.Zones()
if !ok {
return nil, fmt.Errorf("DNS provider does not support zones")
}
c.zones = zones
return c, nil
}
@ -234,7 +231,7 @@ func (c *DNSController) runOnce() error {
oldValueMap = c.lastSuccessfulSnapshot.recordValues
}
op, err := newDNSOp(c.zoneRules, c.zones)
op, err := newDNSOp(c.zoneRules, c.dnsCache)
if err != nil {
return err
}
@ -282,6 +279,13 @@ func (c *DNSController) runOnce() error {
}
}
for key, changeset := range op.changesets {
glog.V(2).Infof("applying DNS changeset for zone %s", key)
if err := changeset.Apply(); err != nil {
errors = append(errors, fmt.Errorf("error applying DNS changeset: %v", err))
}
}
if len(errors) != 0 {
return errors[0]
}
@ -293,17 +297,17 @@ func (c *DNSController) runOnce() error {
return nil
}
// dnsOp manages a single dns change; we cache results and state for the duration of the operation
type dnsOp struct {
zonesProvider dnsprovider.Zones
zones map[string]dnsprovider.Zone
dnsCache *dnsCache
zones map[string]dnsprovider.Zone
recordsCache map[string][]dnsprovider.ResourceRecordSet
changesets map[string]dnsprovider.ResourceRecordChangeset
}
func newDNSOp(zoneRules *ZoneRules, zonesProvider dnsprovider.Zones) (*dnsOp, error) {
o := &dnsOp{
zonesProvider: zonesProvider,
}
zones, err := zonesProvider.List()
func newDNSOp(zoneRules *ZoneRules, dnsCache *dnsCache) (*dnsOp, error) {
zones, err := dnsCache.ListZones(zoneListCacheValidity)
if err != nil {
return nil, fmt.Errorf("error querying for zones: %v", err)
}
@ -336,7 +340,13 @@ func newDNSOp(zoneRules *ZoneRules, zonesProvider dnsprovider.Zones) (*dnsOp, er
glog.Warningf("Found multiple zones for name %q, won't manage zone (To fix: provide zone mapping flag with ID of zone)", name)
}
}
o.zones = zoneMap
o := &dnsOp{
dnsCache: dnsCache,
zones: zoneMap,
changesets: make(map[string]dnsprovider.ResourceRecordChangeset),
recordsCache: make(map[string][]dnsprovider.ResourceRecordSet),
}
return o, nil
}
@ -363,6 +373,45 @@ func (o *dnsOp) findZone(fqdn string) dnsprovider.Zone {
}
}
func (o *dnsOp) getChangeset(zone dnsprovider.Zone) (dnsprovider.ResourceRecordChangeset, error) {
key := zone.Name() + "::" + zone.ID()
changeset := o.changesets[key]
if changeset == nil {
rrsProvider, ok := zone.ResourceRecordSets()
if !ok {
return nil, fmt.Errorf("zone does not support resource records %q", zone.Name())
}
changeset = rrsProvider.StartChangeset()
o.changesets[key] = changeset
}
return changeset, nil
}
// listRecords is a wrapper around listing records, but will cache the results for the duration of the dnsOp
func (o *dnsOp) listRecords(zone dnsprovider.Zone) ([]dnsprovider.ResourceRecordSet, error) {
key := zone.Name() + "::" + zone.ID()
rrs := o.recordsCache[key]
if rrs == nil {
rrsProvider, ok := zone.ResourceRecordSets()
if !ok {
return nil, fmt.Errorf("zone does not support resource records %q", zone.Name())
}
glog.V(2).Infof("Querying all dnsprovider records for zone %q", zone.Name())
var err error
rrs, err = rrsProvider.List()
if err != nil {
return nil, fmt.Errorf("error querying resource records for zone %q: %v", zone.Name(), err)
}
o.recordsCache[key] = rrs
}
return rrs, nil
}
func (o *dnsOp) deleteRecords(k recordKey) error {
glog.V(2).Infof("Deleting all records for %s", k)
@ -374,19 +423,16 @@ func (o *dnsOp) deleteRecords(k recordKey) error {
return fmt.Errorf("no suitable zone found for %q", fqdn)
}
rrsProvider, ok := zone.ResourceRecordSets()
if !ok {
return fmt.Errorf("zone does not support resource records %q", zone.Name())
}
rrs, err := rrsProvider.List()
rrs, err := o.listRecords(zone)
if err != nil {
return fmt.Errorf("error querying resource records for zone %q: %v", zone.Name(), err)
}
cs := rrsProvider.StartChangeset()
cs, err := o.getChangeset(zone)
if err != nil {
return err
}
empty := true
for _, rr := range rrs {
rrName := EnsureDotSuffix(rr.Name())
if rrName != fqdn {
@ -400,23 +446,12 @@ func (o *dnsOp) deleteRecords(k recordKey) error {
glog.V(2).Infof("Deleting resource record %s %s", rrName, rr.Type())
cs.Remove(rr)
empty = false
}
if empty {
return nil
}
if err := cs.Apply(); err != nil {
return fmt.Errorf("error deleting DNS resource records: %v", err)
}
return nil
}
func (o *dnsOp) updateRecords(k recordKey, newRecords []string, ttl int64) error {
glog.V(2).Infof("applying changes to DNS provider for %s: %v", k, newRecords)
fqdn := EnsureDotSuffix(k.FQDN)
zone := o.findZone(fqdn)
@ -430,7 +465,7 @@ func (o *dnsOp) updateRecords(k recordKey, newRecords []string, ttl int64) error
return fmt.Errorf("zone does not support resource records %q", zone.Name())
}
rrs, err := rrsProvider.List()
rrs, err := o.listRecords(zone)
if err != nil {
return fmt.Errorf("error querying resource records for zone %q: %v", zone.Name(), err)
}
@ -449,24 +484,26 @@ func (o *dnsOp) updateRecords(k recordKey, newRecords []string, ttl int64) error
if existing != nil {
glog.Warningf("Found multiple matching records: %v and %v", existing, rr)
} else {
glog.V(8).Infof("Found matching record: %s %s", k.RecordType, rrName)
}
existing = rr
}
cs := rrsProvider.StartChangeset()
cs, err := o.getChangeset(zone)
if err != nil {
return err
}
if existing != nil {
glog.V(2).Infof("will replace existing dns record %s %s", existing.Type(), existing.Name())
cs.Remove(existing)
}
glog.V(2).Infof("Updating resource record %s %s", k, newRecords)
glog.V(2).Infof("Adding DNS changes to batch %s %s", k, newRecords)
rr := rrsProvider.New(fqdn, newRecords, ttl, rrstype.RrsType(k.RecordType))
cs.Add(rr)
if err := cs.Apply(); err != nil {
return fmt.Errorf("error updating resource record %s %s: %v", fqdn, rr.Type(), err)
}
return nil
}
@ -503,7 +540,7 @@ func (s *DNSControllerScope) Replace(recordName string, records []Record) {
s.Records[recordName] = records
}
glog.V(2).Infof("Update %s/%s: %v", s.ScopeName, recordName, records)
glog.V(2).Infof("Update desired state: %s/%s: %v", s.ScopeName, recordName, records)
s.parent.recordChange()
}

View File

@ -44,3 +44,15 @@ type Record struct {
func AliasForNodesInRole(role, roleType string) string {
return "node/role=" + role + "/" + roleType
}
func (r *Record) String() string {
s := "Record:[Type=" + string(r.RecordType) + ",FQDN=" + r.FQDN + ",Value=" + r.Value
if r.AliasTarget {
s += ",AliasTarget"
}
s += "]"
return s
}

View File

@ -144,6 +144,8 @@ func precreateDNS(cluster *api.Cluster, cloud fi.Cloud) error {
return fmt.Errorf("error getting DNS resource records for %q", zone.Name())
}
// TODO: We should change the filter to be a suffix match instead
//records, err := rrs.List("", "")
records, err := rrs.List()
if err != nil {
return fmt.Errorf("error listing DNS resource records for %q: %v", zone.Name(), err)