Initial implementation

This commit is contained in:
Jaromir Vanek 2021-10-18 13:25:25 -07:00
parent f9be53fbc1
commit 57be81d4bb
8 changed files with 1015 additions and 0 deletions

76
README.md Normal file
View File

@ -0,0 +1,76 @@
# corends-multicluster
## Name
*multicluster* - implementation of [Multicluster DNS](https://github.com/kubernetes/enhancements/pull/2577)
## Description
This plugin implements the [Kubernetes DNS-Based Multicluster Service Discovery
Specification](https://github.com/kubernetes/enhancements/pull/2577).
## Syntax
```
multicluster [ZONES...] {
kubeconfig KUBECONFIG [CONTEXT]
fallthrough [ZONES...]
}
```
* `kubeconfig` **KUBECONFIG [CONTEXT]** authenticates the connection to a remote k8s cluster using a kubeconfig file. **[CONTEXT]** is optional, if not set, then the current context specified in kubeconfig will be used. It supports TLS, username and password, or token-based authentication. This option is ignored if connecting in-cluster (i.e., the endpoint is not specified).
* `fallthrough` **[ZONES...]** If a query for a record in the zones for which the plugin is authoritative results in NXDOMAIN, normally that is what the response will be. However, if you specify this option, the query will instead be passed on down the plugin chain, which can include another plugin to handle the query. If **[ZONES...]** is omitted, then fallthrough happens for all zones for which the plugin is authoritative. If specific zones are listed (for example `in-addr.arpa` and `ip6.arpa`), then only queries for those zones will be subject to fallthrough.
## Startup
When CoreDNS starts with the *multicluster* plugin enabled, it will delay serving DNS for up to 5 seconds until it can connect to the Kubernetes API and synchronize all object watches. If this cannot happen within 5 seconds, then CoreDNS will start serving DNS while the *multicluster* plugin continues to try to connect and synchronize all object watches. CoreDNS will answer SERVFAIL to any request made for a Kubernetes record that has not yet been synchronized.
## Examples
Handle all queries in the `clusterset.local` zone. Connect to Kubernetes in-cluster.
```
.:53 {
multicluster clusterset.local
}
```
## Installation
See CoreDNS documentation about [Compile Time Enabling or Disabling Plugins](https://coredns.io/2017/07/25/compile-time-enabling-or-disabling-plugins/).
### Recompile coredns
Add the plugin to `plugins.cfg` file. The [ordering of plugins matters](https://coredns.io/2017/06/08/how-queries-are-processed-in-coredns/),
add it just below `kubernetes` plugin that has very similar functionality:
```
...
kubernetes:kubernetes
multicluster:github.com/vanekjar/coredns-multicluster
...
```
Follow the [coredns README](https://github.com/coredns/coredns#readme) file to build it.
### Modify cluster's corefile
To enable the plugin for `clusterset.local` zone, add `multicluster` configuration to the `corefile`. Resulting `corefile` may look like this:
```
.:53 {
errors
health
multicluster clusterset.local
kubernetes cluster.local in-addr.arpa ip6.arpa {
pods insecure
fallthrough in-addr.arpa ip6.arpa
}
prometheus :9153
forward . /etc/resolv.conf
cache 30
loop
reload
loadbalance
}
```

228
controller.go Normal file
View File

@ -0,0 +1,228 @@
package multicluster
import (
"context"
"errors"
"fmt"
"github.com/coredns/coredns/plugin/kubernetes/object"
model "github.com/vanekjar/coredns-multicluster/object"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
mcs "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1"
"sync"
"sync/atomic"
"time"
)
const (
svcNameNamespaceIndex = "ServiceNameNamespace"
)
type controller interface {
ServiceList() []*model.ServiceImport
SvcIndex(string) []*model.ServiceImport
GetNamespaceByName(string) (*object.Namespace, error)
Run()
HasSynced() bool
Stop() error
// Modified returns the timestamp of the most recent changes
Modified() int64
}
type control struct {
// Modified tracks timestamp of the most recent changes
// It needs to be first because it is guaranteed to be 8-byte
// aligned ( we use sync.LoadAtomic with this )
modified int64
k8sClient kubernetes.Interface
mcsClient mcs.MulticlusterV1alpha1Interface
svcImportController cache.Controller
svcImportLister cache.Indexer
nsController cache.Controller
nsLister cache.Store
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock sync.Mutex
shutdown bool
stopCh chan struct{}
}
func newController(ctx context.Context, k8sClient kubernetes.Interface, mcsClient mcs.MulticlusterV1alpha1Interface) *control {
ctl := control{
k8sClient: k8sClient,
mcsClient: mcsClient,
stopCh: make(chan struct{}),
}
ctl.svcImportLister, ctl.svcImportController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: serviceImportListFunc(ctx, ctl.mcsClient, api.NamespaceAll),
WatchFunc: serviceImportWatchFunc(ctx, ctl.mcsClient, api.NamespaceAll),
},
&v1alpha1.ServiceImport{},
cache.ResourceEventHandlerFuncs{AddFunc: ctl.Add, UpdateFunc: ctl.Update, DeleteFunc: ctl.Delete},
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc},
object.DefaultProcessor(model.ToServiceImport, nil),
)
ctl.nsLister, ctl.nsController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: namespaceListFunc(ctx, ctl.k8sClient),
WatchFunc: namespaceWatchFunc(ctx, ctl.k8sClient),
},
&api.Namespace{},
cache.ResourceEventHandlerFuncs{},
cache.Indexers{},
object.DefaultProcessor(object.ToNamespace, nil),
)
return &ctl
}
// Stop stops the controller.
func (c *control) Stop() error {
c.stopLock.Lock()
defer c.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if !c.shutdown {
close(c.stopCh)
c.shutdown = true
return nil
}
return fmt.Errorf("shutdown already in progress")
}
// Run starts the controller.
func (c *control) Run() {
go c.svcImportController.Run(c.stopCh)
go c.nsController.Run(c.stopCh)
<-c.stopCh
}
// HasSynced calls on all controllers.
func (c *control) HasSynced() bool {
return c.svcImportController.HasSynced() && c.nsController.HasSynced()
}
func (c *control) SvcIndex(idx string) (svcs []*model.ServiceImport) {
os, err := c.svcImportLister.ByIndex(svcNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
s, ok := o.(*model.ServiceImport)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (c *control) ServiceList() (svcs []*model.ServiceImport) {
os := c.svcImportLister.List()
for _, o := range os {
s, ok := o.(*model.ServiceImport)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func serviceImportListFunc(ctx context.Context, c mcs.MulticlusterV1alpha1Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
return c.ServiceImports(ns).List(ctx, opts)
}
}
func serviceImportWatchFunc(ctx context.Context, c mcs.MulticlusterV1alpha1Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
return c.ServiceImports(ns).Watch(ctx, options)
}
}
func namespaceListFunc(ctx context.Context, c kubernetes.Interface) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
return c.CoreV1().Namespaces().List(ctx, opts)
}
}
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
return c.CoreV1().Namespaces().Watch(ctx, options)
}
}
// GetNamespaceByName returns the namespace by name. If nothing is found an error is returned.
func (c *control) GetNamespaceByName(name string) (*object.Namespace, error) {
o, exists, err := c.nsLister.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("namespace not found")
}
ns, ok := o.(*object.Namespace)
if !ok {
return nil, fmt.Errorf("found key but not namespace")
}
return ns, nil
}
func (c *control) Add(obj interface{}) { c.updateModified() }
func (c *control) Delete(obj interface{}) { c.updateModified() }
func (c *control) Update(oldObj, newObj interface{}) { c.detectChanges(oldObj, newObj) }
// detectChanges detects changes in objects, and updates the modified timestamp
func (c *control) detectChanges(oldObj, newObj interface{}) {
// If both objects have the same resource version, they are identical.
if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) {
return
}
obj := newObj
if obj == nil {
obj = oldObj
}
switch ob := obj.(type) {
case *model.ServiceImport:
c.updateModified()
default:
log.Warningf("Updates for %T not supported.", ob)
}
}
func (c *control) Modified() int64 {
unix := atomic.LoadInt64(&c.modified)
return unix
}
func (c *control) updateModified() {
unix := time.Now().Unix()
atomic.StoreInt64(&c.modified, unix)
}
func svcNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
s, ok := obj.(*model.ServiceImport)
if !ok {
return nil, errors.New("obj was not of the correct type")
}
return []string{s.Index}, nil
}

369
multicluster.go Normal file
View File

@ -0,0 +1,369 @@
package multicluster
import (
"context"
"errors"
"fmt"
"github.com/coredns/coredns/plugin/etcd/msg"
"github.com/vanekjar/coredns-multicluster/object"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/coredns/coredns/plugin/pkg/fall"
"github.com/coredns/coredns/request"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1"
"strings"
"time"
"github.com/coredns/coredns/plugin"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/miekg/dns"
)
const (
DNSSchemaVersion = "1.1.0"
// Svc is the DNS schema for kubernetes services
Svc = "svc"
// Pod is the DNS schema for kubernetes pods
Pod = "pod"
// defaultTTL to apply to all answers.
defaultTTL = 5
)
var (
errNoItems = errors.New("no items found")
errNsNotExposed = errors.New("namespace is not exposed")
errInvalidRequest = errors.New("invalid query name")
)
// Define log to be a logger with the plugin name in it. This way we can just use log.Info and
// friends to log.
var log = clog.NewWithPlugin(pluginName)
// MultiCluster implements a plugin supporting multi-cluster DNS spec.
type MultiCluster struct {
Next plugin.Handler
Zones []string
ClientConfig clientcmd.ClientConfig
Fall fall.F
controller controller
ttl uint32
}
func New(zones []string) *MultiCluster {
m := MultiCluster{
Zones: zones,
}
m.ttl = defaultTTL
return &m
}
func (m *MultiCluster) InitController(ctx context.Context) (onStart func() error, onShut func() error, err error) {
config, err := m.getClientConfig()
if err != nil {
return nil, nil, err
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, fmt.Errorf("failed to create kubernetes notification controller: %q", err)
}
mcsClient, err := v1alpha1.NewForConfig(config)
m.controller = newController(ctx, kubeClient, mcsClient)
onStart = func() error {
go func() {
m.controller.Run()
}()
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if m.controller.HasSynced() {
return nil
}
case <-timeout:
log.Warning("starting server with unsynced Kubernetes API")
return nil
}
}
}
onShut = func() error {
return m.controller.Stop()
}
return onStart, onShut, err
}
func (m MultiCluster) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
state := request.Request{W: w, Req: r}
qname := state.QName()
zone := plugin.Zones(m.Zones).Matches(qname)
if zone == "" {
return plugin.NextOrFailure(m.Name(), m.Next, ctx, w, r)
}
zone = qname[len(qname)-len(zone):] // maintain case of original query
state.Zone = zone
var (
records []dns.RR
extra []dns.RR
err error
)
switch state.QType() {
case dns.TypeA:
records, err = plugin.A(ctx, &m, zone, state, nil, plugin.Options{})
case dns.TypeAAAA:
records, err = plugin.AAAA(ctx, &m, zone, state, nil, plugin.Options{})
case dns.TypeTXT:
records, err = plugin.TXT(ctx, &m, zone, state, nil, plugin.Options{})
case dns.TypeSRV:
records, extra, err = plugin.SRV(ctx, &m, zone, state, plugin.Options{})
case dns.TypeNS:
if state.Name() == zone {
records, extra, err = plugin.NS(ctx, &m, zone, state, plugin.Options{})
break
}
fallthrough
default:
// Do a fake A lookup, so we can distinguish between NODATA and NXDOMAIN
fake := state.NewWithQuestion(state.QName(), dns.TypeA)
fake.Zone = state.Zone
_, err = plugin.A(ctx, &m, zone, fake, nil, plugin.Options{})
}
if m.IsNameError(err) {
if m.Fall.Through(state.Name()) {
return plugin.NextOrFailure(m.Name(), m.Next, ctx, w, r)
}
if !m.controller.HasSynced() {
// If we haven't synchronized with the kubernetes cluster, return server failure
return plugin.BackendError(ctx, &m, zone, dns.RcodeServerFailure, state, nil /* err */, plugin.Options{})
}
return plugin.BackendError(ctx, &m, zone, dns.RcodeNameError, state, nil /* err */, plugin.Options{})
}
if err != nil {
return dns.RcodeServerFailure, err
}
if len(records) == 0 {
return plugin.BackendError(ctx, &m, zone, dns.RcodeSuccess, state, nil, plugin.Options{})
}
message := new(dns.Msg)
message.SetReply(r)
message.Authoritative = true
message.Answer = append(message.Answer, records...)
message.Extra = append(message.Extra, extra...)
w.WriteMsg(message)
return dns.RcodeSuccess, nil
}
// Name implements the Handler interface.
func (m MultiCluster) Name() string { return pluginName }
// Implement ServiceBackend interface
// Services communicates with the backend to retrieve the service definitions. Exact indicates
// on exact match should be returned.
func (m MultiCluster) Services(ctx context.Context, state request.Request, exact bool, opt plugin.Options) ([]msg.Service, error) {
switch state.QType() {
case dns.TypeTXT:
// 1 label + zone, label must be "dns-version".
t, _ := dnsutil.TrimZone(state.Name(), state.Zone)
segs := dns.SplitDomainName(t)
if len(segs) != 1 {
return nil, nil
}
if segs[0] != "dns-version" {
return nil, nil
}
svc := msg.Service{Text: DNSSchemaVersion, TTL: 28800, Key: msg.Path(state.QName(), coredns)}
return []msg.Service{svc}, nil
// TODO support TypeNS
}
return m.Records(ctx, state, false)
}
// Reverse communicates with the backend to retrieve service definition based on a IP address
// instead of a name. I.e. a reverse DNS lookup.
func (m MultiCluster) Reverse(ctx context.Context, state request.Request, exact bool, opt plugin.Options) ([]msg.Service, error) {
return nil, errors.New("reverse lookup is not supported")
}
// Lookup is used to find records else where.
func (m MultiCluster) Lookup(ctx context.Context, state request.Request, name string, typ uint16) (*dns.Msg, error) {
return nil, errors.New("external lookup is not supported")
}
// Returns _all_ services that matches a certain name.
// Note: it does not implement a specific service.
func (m MultiCluster) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) {
r, e := parseRequest(state.Name(), state.Zone)
if e != nil {
return nil, e
}
if r.podOrSvc == "" {
return nil, nil
}
if dnsutil.IsReverse(state.Name()) > 0 {
return nil, errNoItems
}
if !wildcard(r.namespace) && !m.namespaceExists(r.namespace) {
return nil, errNsNotExposed
}
services, err := m.findServices(r, state.Zone)
return services, err
}
// IsNameError returns true if err indicated a record not found condition
func (m MultiCluster) IsNameError(err error) bool {
return err == errNoItems || err == errNsNotExposed || err == errInvalidRequest
}
// Serial returns a SOA serial number to construct a SOA record.
func (m MultiCluster) Serial(state request.Request) uint32 {
return uint32(m.controller.Modified())
}
// MinTTL returns the minimum TTL to be used in the SOA record.
func (m MultiCluster) MinTTL(state request.Request) uint32 {
return m.ttl
}
type ResponsePrinter struct {
dns.ResponseWriter
}
// NewResponsePrinter returns ResponseWriter.
func NewResponsePrinter(w dns.ResponseWriter) *ResponsePrinter {
return &ResponsePrinter{ResponseWriter: w}
}
func (r *ResponsePrinter) WriteMsg(res *dns.Msg) error {
return r.ResponseWriter.WriteMsg(res)
}
func (m *MultiCluster) getClientConfig() (*rest.Config, error) {
if m.ClientConfig != nil {
return m.ClientConfig.ClientConfig()
}
cc, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
cc.ContentType = "application/vnd.kubernetes.protobuf"
return cc, err
}
// wildcard checks whether s contains a wildcard value defined as "*" or "any".
func wildcard(s string) bool {
return s == "*" || s == "any"
}
func (m *MultiCluster) namespaceExists(namespace string) bool {
_, err := m.controller.GetNamespaceByName(namespace)
if err != nil {
return false
}
return true
}
func (m *MultiCluster) findServices(r recordRequest, zone string) (services []msg.Service, err error) {
if !wildcard(r.namespace) && !m.namespaceExists(r.namespace) {
return nil, errNoItems
}
// handle empty service name
if r.service == "" {
if m.namespaceExists(r.namespace) || wildcard(r.namespace) {
// NODATA
return nil, nil
}
// NXDOMAIN
return nil, errNoItems
}
err = errNoItems
if wildcard(r.service) && !wildcard(r.namespace) {
// If namespace exists, err should be nil, so that we return NODATA instead of NXDOMAIN
if m.namespaceExists(r.namespace) {
err = nil
}
}
var serviceList []*object.ServiceImport
if wildcard(r.service) || wildcard(r.namespace) {
serviceList = m.controller.ServiceList()
} else {
idx := object.ServiceKey(r.service, r.namespace)
serviceList = m.controller.SvcIndex(idx)
}
zonePath := msg.Path(zone, coredns)
for _, svc := range serviceList {
if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) {
continue
}
// If request namespace is a wildcard, filter results against Corefile namespace list.
// (Namespaces without a wildcard were filtered before the call to this function.)
if wildcard(r.namespace) && !m.namespaceExists(svc.Namespace) {
continue
}
// ClusterSetIP service
for _, p := range svc.Ports {
if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {
continue
}
err = nil
for _, ip := range svc.ClusterIPs {
s := msg.Service{Host: ip, Port: int(p.Port), TTL: m.ttl}
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
services = append(services, s)
}
}
// TODO handle headless ServiceImport
}
return services, err
}
// match checks if a and b are equal taking wildcards into account.
func match(a, b string) bool {
if wildcard(a) {
return true
}
if wildcard(b) {
return true
}
return strings.EqualFold(a, b)
}
const coredns = "c" // used as a fake key prefix in msg.Service

89
object/serviceimport.go Normal file
View File

@ -0,0 +1,89 @@
package object
import (
"fmt"
"github.com/coredns/coredns/plugin/kubernetes/object"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
v1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)
// ServiceImport is a stripped down api.ServiceImport with only the items we need for CoreDNS.
type ServiceImport struct {
Version string
Name string
Namespace string
Index string
ClusterIPs []string
Type v1alpha1.ServiceImportType
Ports []v1alpha1.ServicePort
*object.Empty
}
// ServiceKey returns a string using for the index.
func ServiceKey(name, namespace string) string { return name + "." + namespace }
// ToServiceImport converts an v1alpha1.ServiceImport to a *ServiceImport.
func ToServiceImport(obj meta.Object) (meta.Object, error) {
svc, ok := obj.(*v1alpha1.ServiceImport)
if !ok {
return nil, fmt.Errorf("unexpected object %v", obj)
}
s := &ServiceImport{
Version: svc.GetResourceVersion(),
Name: svc.GetName(),
Namespace: svc.GetNamespace(),
Index: ServiceKey(svc.GetName(), svc.GetNamespace()),
Type: svc.Spec.Type,
}
if len(svc.Spec.IPs) > 0 {
s.ClusterIPs = make([]string, len(svc.Spec.IPs))
copy(s.ClusterIPs, svc.Spec.IPs)
}
if len(svc.Spec.Ports) > 0 {
s.Ports = make([]v1alpha1.ServicePort, len(svc.Spec.Ports))
copy(s.Ports, svc.Spec.Ports)
}
return s, nil
}
var _ runtime.Object = &ServiceImport{}
// DeepCopyObject implements the ObjectKind interface.
func (s *ServiceImport) DeepCopyObject() runtime.Object {
s1 := &ServiceImport{
Version: s.Version,
Name: s.Name,
Namespace: s.Namespace,
Index: s.Index,
Type: s.Type,
ClusterIPs: make([]string, len(s.ClusterIPs)),
Ports: make([]v1alpha1.ServicePort, len(s.Ports)),
}
copy(s1.ClusterIPs, s.ClusterIPs)
copy(s1.Ports, s.Ports)
return s1
}
// GetNamespace implements the metav1.Object interface.
func (s *ServiceImport) GetNamespace() string { return s.Namespace }
// SetNamespace implements the metav1.Object interface.
func (s *ServiceImport) SetNamespace(namespace string) {}
// GetName implements the metav1.Object interface.
func (s *ServiceImport) GetName() string { return s.Name }
// SetName implements the metav1.Object interface.
func (s *ServiceImport) SetName(name string) {}
// GetResourceVersion implements the metav1.Object interface.
func (s *ServiceImport) GetResourceVersion() string { return s.Version }
// SetResourceVersion implements the metav1.Object interface.
func (s *ServiceImport) SetResourceVersion(version string) {}

113
parse.go Normal file
View File

@ -0,0 +1,113 @@
package multicluster
import (
"github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/miekg/dns"
)
type recordRequest struct {
// The named port from the kubernetes DNS spec, this is the service part (think _https) from a well formed
// SRV record.
port string
// The protocol is usually _udp or _tcp (if set), and comes from the protocol part of a well formed
// SRV record.
protocol string
endpoint string
// The servicename used in Kubernetes.
service string
// The namespace used in Kubernetes.
namespace string
// A each name can be for a pod or a service, here we track what we've seen, either "pod" or "service".
podOrSvc string
}
// parseRequest parses the qname to find all the elements we need for querying k8s. Anything
// that is not parsed will have the wildcard "*" value (except r.endpoint).
// Potential underscores are stripped from _port and _protocol.
func parseRequest(name, zone string) (r recordRequest, err error) {
// 3 Possible cases:
// 1. _port._protocol.service.namespace.pod|svc.zone
// 2. (endpoint): endpoint.service.namespace.pod|svc.zone
// 3. (service): service.namespace.pod|svc.zone
base, _ := dnsutil.TrimZone(name, zone)
// return NODATA for apex queries
if base == "" || base == Svc || base == Pod {
return r, nil
}
segs := dns.SplitDomainName(base)
r.port = "*"
r.protocol = "*"
// for r.name, r.namespace and r.endpoint, we need to know if they have been set or not...
// For endpoint: if empty we should skip the endpoint check in k.get(). Hence we cannot set if to "*".
// For name: myns.svc.cluster.local != *.myns.svc.cluster.local
// For namespace: svc.cluster.local != *.svc.cluster.local
// start at the right and fill out recordRequest with the bits we find, so we look for
// pod|svc.namespace.service and then either
// * endpoint
// *_protocol._port
last := len(segs) - 1
if last < 0 {
return r, nil
}
r.podOrSvc = segs[last]
if r.podOrSvc != Pod && r.podOrSvc != Svc {
return r, errInvalidRequest
}
last--
if last < 0 {
return r, nil
}
r.namespace = segs[last]
last--
if last < 0 {
return r, nil
}
r.service = segs[last]
last--
if last < 0 {
return r, nil
}
// Because of ambiguity we check the labels left: 1: an endpoint. 2: port and protocol.
// Anything else is a query that is too long to answer and can safely be delegated to return an nxdomain.
switch last {
case 0: // endpoint only
r.endpoint = segs[last]
case 1: // service and port
r.protocol = stripUnderscore(segs[last])
r.port = stripUnderscore(segs[last-1])
default: // too long
return r, errInvalidRequest
}
return r, nil
}
// stripUnderscore removes a prefixed underscore from s.
func stripUnderscore(s string) string {
if s[0] != '_' {
return s
}
return s[1:]
}
// String returns a string representation of r, it just returns all fields concatenated with dots.
// This is mostly used in tests.
func (r recordRequest) String() string {
s := r.port
s += "." + r.protocol
s += "." + r.endpoint
s += "." + r.service
s += "." + r.namespace
s += "." + r.podOrSvc
return s
}

62
parse_test.go Normal file
View File

@ -0,0 +1,62 @@
package multicluster
import (
"testing"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
)
func TestParseRequest(t *testing.T) {
tests := []struct {
query string
expected string // output from r.String()
}{
// valid SRV request
{"_http._tcp.webs.mynamespace.svc.inter.webs.tests.", "http.tcp..webs.mynamespace.svc"},
// wildcard acceptance
{"*.any.*.any.svc.inter.webs.tests.", "*.any..*.any.svc"},
// A request of endpoint
{"1-2-3-4.webs.mynamespace.svc.inter.webs.tests.", "*.*.1-2-3-4.webs.mynamespace.svc"},
// bare zone
{"inter.webs.tests.", "....."},
// bare svc type
{"svc.inter.webs.tests.", "....."},
// bare pod type
{"pod.inter.webs.tests.", "....."},
}
for i, tc := range tests {
m := new(dns.Msg)
m.SetQuestion(tc.query, dns.TypeA)
state := request.Request{Zone: zone, Req: m}
r, e := parseRequest(state.Name(), state.Zone)
if e != nil {
t.Errorf("Test %d, expected no error, got '%v'.", i, e)
}
rs := r.String()
if rs != tc.expected {
t.Errorf("Test %d, expected (stringified) recordRequest: %s, got %s", i, tc.expected, rs)
}
}
}
func TestParseInvalidRequest(t *testing.T) {
invalid := []string{
"webs.mynamespace.pood.inter.webs.test.", // Request must be for pod or svc subdomain.
"too.long.for.what.I.am.trying.to.pod.inter.webs.tests.", // Too long.
}
for i, query := range invalid {
m := new(dns.Msg)
m.SetQuestion(query, dns.TypeA)
state := request.Request{Zone: zone, Req: m}
if _, e := parseRequest(state.Name(), state.Zone); e == nil {
t.Errorf("Test %d: expected error from %s, got none", i, query)
}
}
}
const zone = "inter.webs.tests."

5
ready.go Normal file
View File

@ -0,0 +1,5 @@
package multicluster
// Ready implements the ready.Readiness interface.
//func (m *MultiCluster) Ready() bool { return m.controller.HasSynced() }
func (m *MultiCluster) Ready() bool { return true}

73
setup.go Normal file
View File

@ -0,0 +1,73 @@
package multicluster
import (
"context"
"github.com/coredns/caddy"
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
"k8s.io/client-go/tools/clientcmd"
)
const pluginName = "multicluster"
// init registers this plugin.
func init() { plugin.Register(pluginName, setup) }
func setup(c *caddy.Controller) error {
c.Next() // Skip "multicluster" label
multiCluster, err := ParseStanza(c)
if err != nil {
return plugin.Error(pluginName, err)
}
onStart, onShut, err := multiCluster.InitController(context.Background())
if err != nil {
return plugin.Error(pluginName, err)
}
if onStart != nil {
c.OnStartup(onStart)
}
if onShut != nil {
c.OnShutdown(onShut)
}
// Add the Plugin to CoreDNS, so Servers can use it in their plugin chain.
dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
multiCluster.Next = next
return multiCluster
})
return nil
}
// ParseStanza parses a kubernetes stanza
func ParseStanza(c *caddy.Controller) (*MultiCluster, error) {
zones := plugin.OriginsFromArgsOrServerBlock(c.RemainingArgs(), c.ServerBlockKeys)
multiCluster := New(zones)
for c.NextBlock() {
switch c.Val() {
case "kubeconfig":
args := c.RemainingArgs()
if len(args) != 1 && len(args) != 2 {
return nil, c.ArgErr()
}
overrides := &clientcmd.ConfigOverrides{}
if len(args) == 2 {
overrides.CurrentContext = args[1]
}
config := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: args[0]},
overrides,
)
multiCluster.ClientConfig = config
case "fallthrough":
multiCluster.Fall.SetZonesFromArgs(c.RemainingArgs())
default:
return nil, c.Errf("unknown property '%s'", c.Val())
}
}
return multiCluster, nil
}