390 lines
10 KiB
Go
390 lines
10 KiB
Go
package multicluster
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
k8sObject "github.com/coredns/coredns/plugin/kubernetes/object"
|
|
"github.com/coredns/multicluster/object"
|
|
api "k8s.io/api/core/v1"
|
|
discovery "k8s.io/api/discovery/v1"
|
|
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/cache"
|
|
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
|
|
mcsClientset "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1"
|
|
)
|
|
|
|
const (
|
|
svcNameNamespaceIndex = "ServiceNameNamespace"
|
|
epNameNamespaceIndex = "EndpointNameNamespace"
|
|
)
|
|
|
|
type controller interface {
|
|
ServiceList() []*object.ServiceImport
|
|
EndpointsList() []*object.Endpoints
|
|
SvcIndex(string) []*object.ServiceImport
|
|
EpIndex(string) []*object.Endpoints
|
|
|
|
GetNamespaceByName(string) (*k8sObject.Namespace, error)
|
|
|
|
Run()
|
|
HasSynced() bool
|
|
Stop() error
|
|
|
|
// Modified returns the timestamp of the most recent changes
|
|
Modified() int64
|
|
}
|
|
|
|
type control struct {
|
|
// Modified tracks timestamp of the most recent changes
|
|
// It needs to be first because it is guaranteed to be 8-byte
|
|
// aligned ( we use sync.LoadAtomic with this )
|
|
modified int64
|
|
|
|
k8sClient kubernetes.Interface
|
|
mcsClient mcsClientset.MulticlusterV1alpha1Interface
|
|
|
|
svcImportController cache.Controller
|
|
svcImportLister cache.Indexer
|
|
|
|
nsController cache.Controller
|
|
nsLister cache.Store
|
|
|
|
epController cache.Controller
|
|
epLister cache.Indexer
|
|
|
|
// stopLock is used to enforce only a single call to Stop is active.
|
|
// Needed because we allow stopping through an http endpoint and
|
|
// allowing concurrent stoppers leads to stack traces.
|
|
stopLock sync.Mutex
|
|
shutdown bool
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
type controllerOpts struct {
|
|
initEndpointsCache bool
|
|
}
|
|
|
|
func newController(ctx context.Context, k8sClient kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface, opts controllerOpts) *control {
|
|
ctl := control{
|
|
k8sClient: k8sClient,
|
|
mcsClient: mcsClient,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
// enable ServiceImport watch
|
|
ctl.watchServiceImport(ctx)
|
|
|
|
// enable Namespace watch
|
|
ctl.watchNamespace(ctx)
|
|
|
|
if opts.initEndpointsCache {
|
|
ctl.watchEndpointSlice(ctx)
|
|
}
|
|
|
|
return &ctl
|
|
}
|
|
|
|
func (c *control) watchServiceImport(ctx context.Context) {
|
|
c.svcImportLister, c.svcImportController = k8sObject.NewIndexerInformer(
|
|
&cache.ListWatch{
|
|
ListFunc: serviceImportListFunc(ctx, c.mcsClient, api.NamespaceAll),
|
|
WatchFunc: serviceImportWatchFunc(ctx, c.mcsClient, api.NamespaceAll),
|
|
},
|
|
&mcs.ServiceImport{},
|
|
cache.ResourceEventHandlerFuncs{AddFunc: c.Add, UpdateFunc: c.Update, DeleteFunc: c.Delete},
|
|
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc},
|
|
k8sObject.DefaultProcessor(object.ToServiceImport, nil),
|
|
)
|
|
}
|
|
|
|
func (c *control) watchNamespace(ctx context.Context) {
|
|
c.nsLister, c.nsController = k8sObject.NewIndexerInformer(
|
|
&cache.ListWatch{
|
|
ListFunc: namespaceListFunc(ctx, c.k8sClient),
|
|
WatchFunc: namespaceWatchFunc(ctx, c.k8sClient),
|
|
},
|
|
&api.Namespace{},
|
|
cache.ResourceEventHandlerFuncs{},
|
|
cache.Indexers{},
|
|
k8sObject.DefaultProcessor(k8sObject.ToNamespace, nil),
|
|
)
|
|
}
|
|
|
|
func (c *control) watchEndpointSlice(ctx context.Context) {
|
|
c.epLister, c.epController = k8sObject.NewIndexerInformer(
|
|
&cache.ListWatch{
|
|
ListFunc: endpointSliceListFunc(ctx, c.k8sClient, api.NamespaceAll),
|
|
WatchFunc: endpointSliceWatchFunc(ctx, c.k8sClient, api.NamespaceAll),
|
|
},
|
|
&discovery.EndpointSlice{},
|
|
cache.ResourceEventHandlerFuncs{AddFunc: c.Add, UpdateFunc: c.Update, DeleteFunc: c.Delete},
|
|
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc},
|
|
k8sObject.DefaultProcessor(object.EndpointSliceToEndpoints, nil),
|
|
)
|
|
}
|
|
|
|
// Stop stops the controller.
|
|
func (c *control) Stop() error {
|
|
c.stopLock.Lock()
|
|
defer c.stopLock.Unlock()
|
|
|
|
// Only try draining the workqueue if we haven't already.
|
|
if !c.shutdown {
|
|
close(c.stopCh)
|
|
c.shutdown = true
|
|
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("shutdown already in progress")
|
|
}
|
|
|
|
// Run starts the controller.
|
|
func (c *control) Run() {
|
|
go c.svcImportController.Run(c.stopCh)
|
|
go c.nsController.Run(c.stopCh)
|
|
if c.epController != nil {
|
|
c.epController.Run(c.stopCh)
|
|
}
|
|
|
|
<-c.stopCh
|
|
}
|
|
|
|
// HasSynced calls on all controllers.
|
|
func (c *control) HasSynced() bool {
|
|
return c.svcImportController.HasSynced() && c.nsController.HasSynced()
|
|
}
|
|
|
|
func (c *control) SvcIndex(idx string) (svcs []*object.ServiceImport) {
|
|
os, err := c.svcImportLister.ByIndex(svcNameNamespaceIndex, idx)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
for _, o := range os {
|
|
s, ok := o.(*object.ServiceImport)
|
|
if !ok {
|
|
continue
|
|
}
|
|
svcs = append(svcs, s)
|
|
}
|
|
return svcs
|
|
}
|
|
|
|
func (c *control) ServiceList() (svcs []*object.ServiceImport) {
|
|
os := c.svcImportLister.List()
|
|
for _, o := range os {
|
|
s, ok := o.(*object.ServiceImport)
|
|
if !ok {
|
|
continue
|
|
}
|
|
svcs = append(svcs, s)
|
|
}
|
|
return svcs
|
|
}
|
|
|
|
func (c *control) EndpointsList() (eps []*object.Endpoints) {
|
|
os := c.epLister.List()
|
|
for _, o := range os {
|
|
ep, ok := o.(*object.Endpoints)
|
|
if !ok {
|
|
continue
|
|
}
|
|
eps = append(eps, ep)
|
|
}
|
|
return eps
|
|
}
|
|
|
|
func (c *control) EpIndex(idx string) (ep []*object.Endpoints) {
|
|
os, err := c.epLister.ByIndex(epNameNamespaceIndex, idx)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
for _, o := range os {
|
|
e, ok := o.(*object.Endpoints)
|
|
if !ok {
|
|
continue
|
|
}
|
|
ep = append(ep, e)
|
|
}
|
|
return ep
|
|
}
|
|
|
|
func serviceImportListFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
|
|
return func(opts meta.ListOptions) (runtime.Object, error) {
|
|
return c.ServiceImports(ns).List(ctx, opts)
|
|
}
|
|
}
|
|
|
|
func serviceImportWatchFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
|
|
return func(opts meta.ListOptions) (watch.Interface, error) {
|
|
return c.ServiceImports(ns).Watch(ctx, opts)
|
|
}
|
|
}
|
|
|
|
func namespaceListFunc(ctx context.Context, c kubernetes.Interface) func(meta.ListOptions) (runtime.Object, error) {
|
|
return func(opts meta.ListOptions) (runtime.Object, error) {
|
|
return c.CoreV1().Namespaces().List(ctx, opts)
|
|
}
|
|
}
|
|
|
|
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface) func(options meta.ListOptions) (watch.Interface, error) {
|
|
return func(opts meta.ListOptions) (watch.Interface, error) {
|
|
return c.CoreV1().Namespaces().Watch(ctx, opts)
|
|
}
|
|
}
|
|
|
|
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
|
|
return func(opts meta.ListOptions) (runtime.Object, error) {
|
|
opts.LabelSelector = mcs.LabelServiceName // only slices created by MCS controller
|
|
return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
|
|
}
|
|
}
|
|
|
|
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
|
|
return func(opts meta.ListOptions) (watch.Interface, error) {
|
|
opts.LabelSelector = mcs.LabelServiceName // only slices created by MCS controller
|
|
return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, opts)
|
|
}
|
|
}
|
|
|
|
// GetNamespaceByName returns the namespace by name. If nothing is found an error is returned.
|
|
func (c *control) GetNamespaceByName(name string) (*k8sObject.Namespace, error) {
|
|
o, exists, err := c.nsLister.GetByKey(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !exists {
|
|
return nil, fmt.Errorf("namespace not found")
|
|
}
|
|
ns, ok := o.(*k8sObject.Namespace)
|
|
if !ok {
|
|
return nil, fmt.Errorf("found key but not namespace")
|
|
}
|
|
return ns, nil
|
|
}
|
|
|
|
func (c *control) Add(obj interface{}) { c.updateModified() }
|
|
func (c *control) Delete(obj interface{}) { c.updateModified() }
|
|
func (c *control) Update(oldObj, newObj interface{}) { c.detectChanges(oldObj, newObj) }
|
|
|
|
// detectChanges detects changes in objects, and updates the modified timestamp
|
|
func (c *control) detectChanges(oldObj, newObj interface{}) {
|
|
// If both objects have the same resource version, they are identical.
|
|
if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) {
|
|
return
|
|
}
|
|
obj := newObj
|
|
if obj == nil {
|
|
obj = oldObj
|
|
}
|
|
switch ob := obj.(type) {
|
|
case *object.ServiceImport:
|
|
c.updateModified()
|
|
case *object.Endpoints:
|
|
if !endpointsEquivalent(oldObj.(*object.Endpoints), newObj.(*object.Endpoints)) {
|
|
c.updateModified()
|
|
}
|
|
default:
|
|
log.Warningf("Updates for %T not supported.", ob)
|
|
}
|
|
}
|
|
|
|
// endpointsEquivalent checks if the update to an endpoint is something
|
|
// that matters to us or if they are effectively equivalent.
|
|
func endpointsEquivalent(a, b *object.Endpoints) bool {
|
|
if a == nil || b == nil {
|
|
return false
|
|
}
|
|
|
|
if len(a.Subsets) != len(b.Subsets) {
|
|
return false
|
|
}
|
|
if a.ClusterId != b.ClusterId {
|
|
return false
|
|
}
|
|
|
|
// we should be able to rely on
|
|
// these being sorted and able to be compared
|
|
// they are supposed to be in a canonical format
|
|
for i, sa := range a.Subsets {
|
|
sb := b.Subsets[i]
|
|
if !subsetsEquivalent(sa, sb) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
|
|
// I.e. that they have the same ready addresses, host names, ports (including protocol
|
|
// and service names for SRV)
|
|
func subsetsEquivalent(sa, sb k8sObject.EndpointSubset) bool {
|
|
if len(sa.Addresses) != len(sb.Addresses) {
|
|
return false
|
|
}
|
|
if len(sa.Ports) != len(sb.Ports) {
|
|
return false
|
|
}
|
|
|
|
// in Addresses and Ports, we should be able to rely on
|
|
// these being sorted and able to be compared
|
|
// they are supposed to be in a canonical format
|
|
for addr, aaddr := range sa.Addresses {
|
|
baddr := sb.Addresses[addr]
|
|
if aaddr.IP != baddr.IP {
|
|
return false
|
|
}
|
|
if aaddr.Hostname != baddr.Hostname {
|
|
return false
|
|
}
|
|
}
|
|
|
|
for port, aport := range sa.Ports {
|
|
bport := sb.Ports[port]
|
|
if aport.Name != bport.Name {
|
|
return false
|
|
}
|
|
if aport.Port != bport.Port {
|
|
return false
|
|
}
|
|
if aport.Protocol != bport.Protocol {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *control) Modified() int64 {
|
|
unix := atomic.LoadInt64(&c.modified)
|
|
return unix
|
|
}
|
|
|
|
func (c *control) updateModified() {
|
|
unix := time.Now().Unix()
|
|
atomic.StoreInt64(&c.modified, unix)
|
|
}
|
|
|
|
func svcNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
|
|
s, ok := obj.(*object.ServiceImport)
|
|
if !ok {
|
|
return nil, errors.New("obj was not of the correct type")
|
|
}
|
|
return []string{s.Index}, nil
|
|
}
|
|
|
|
func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
|
|
s, ok := obj.(*object.Endpoints)
|
|
if !ok {
|
|
return nil, errors.New("obj was not of the correct type")
|
|
}
|
|
return []string{s.Index}, nil
|
|
}
|