multicluster/controller.go

390 lines
10 KiB
Go

package multicluster
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
k8sObject "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/multicluster/object"
api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
mcsClientset "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1"
)
const (
svcNameNamespaceIndex = "ServiceNameNamespace"
epNameNamespaceIndex = "EndpointNameNamespace"
)
type controller interface {
ServiceList() []*object.ServiceImport
EndpointsList() []*object.Endpoints
SvcIndex(string) []*object.ServiceImport
EpIndex(string) []*object.Endpoints
GetNamespaceByName(string) (*k8sObject.Namespace, error)
Run()
HasSynced() bool
Stop() error
// Modified returns the timestamp of the most recent changes
Modified() int64
}
type control struct {
// Modified tracks timestamp of the most recent changes
// It needs to be first because it is guaranteed to be 8-byte
// aligned ( we use sync.LoadAtomic with this )
modified int64
k8sClient kubernetes.Interface
mcsClient mcsClientset.MulticlusterV1alpha1Interface
svcImportController cache.Controller
svcImportLister cache.Indexer
nsController cache.Controller
nsLister cache.Store
epController cache.Controller
epLister cache.Indexer
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock sync.Mutex
shutdown bool
stopCh chan struct{}
}
type controllerOpts struct {
initEndpointsCache bool
}
func newController(ctx context.Context, k8sClient kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface, opts controllerOpts) *control {
ctl := control{
k8sClient: k8sClient,
mcsClient: mcsClient,
stopCh: make(chan struct{}),
}
// enable ServiceImport watch
ctl.watchServiceImport(ctx)
// enable Namespace watch
ctl.watchNamespace(ctx)
if opts.initEndpointsCache {
ctl.watchEndpointSlice(ctx)
}
return &ctl
}
func (c *control) watchServiceImport(ctx context.Context) {
c.svcImportLister, c.svcImportController = k8sObject.NewIndexerInformer(
&cache.ListWatch{
ListFunc: serviceImportListFunc(ctx, c.mcsClient, api.NamespaceAll),
WatchFunc: serviceImportWatchFunc(ctx, c.mcsClient, api.NamespaceAll),
},
&mcs.ServiceImport{},
cache.ResourceEventHandlerFuncs{AddFunc: c.Add, UpdateFunc: c.Update, DeleteFunc: c.Delete},
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc},
k8sObject.DefaultProcessor(object.ToServiceImport, nil),
)
}
func (c *control) watchNamespace(ctx context.Context) {
c.nsLister, c.nsController = k8sObject.NewIndexerInformer(
&cache.ListWatch{
ListFunc: namespaceListFunc(ctx, c.k8sClient),
WatchFunc: namespaceWatchFunc(ctx, c.k8sClient),
},
&api.Namespace{},
cache.ResourceEventHandlerFuncs{},
cache.Indexers{},
k8sObject.DefaultProcessor(k8sObject.ToNamespace, nil),
)
}
func (c *control) watchEndpointSlice(ctx context.Context) {
c.epLister, c.epController = k8sObject.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointSliceListFunc(ctx, c.k8sClient, api.NamespaceAll),
WatchFunc: endpointSliceWatchFunc(ctx, c.k8sClient, api.NamespaceAll),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: c.Add, UpdateFunc: c.Update, DeleteFunc: c.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc},
k8sObject.DefaultProcessor(object.EndpointSliceToEndpoints, nil),
)
}
// Stop stops the controller.
func (c *control) Stop() error {
c.stopLock.Lock()
defer c.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if !c.shutdown {
close(c.stopCh)
c.shutdown = true
return nil
}
return fmt.Errorf("shutdown already in progress")
}
// Run starts the controller.
func (c *control) Run() {
go c.svcImportController.Run(c.stopCh)
go c.nsController.Run(c.stopCh)
if c.epController != nil {
c.epController.Run(c.stopCh)
}
<-c.stopCh
}
// HasSynced calls on all controllers.
func (c *control) HasSynced() bool {
return c.svcImportController.HasSynced() && c.nsController.HasSynced()
}
func (c *control) SvcIndex(idx string) (svcs []*object.ServiceImport) {
os, err := c.svcImportLister.ByIndex(svcNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
s, ok := o.(*object.ServiceImport)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (c *control) ServiceList() (svcs []*object.ServiceImport) {
os := c.svcImportLister.List()
for _, o := range os {
s, ok := o.(*object.ServiceImport)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (c *control) EndpointsList() (eps []*object.Endpoints) {
os := c.epLister.List()
for _, o := range os {
ep, ok := o.(*object.Endpoints)
if !ok {
continue
}
eps = append(eps, ep)
}
return eps
}
func (c *control) EpIndex(idx string) (ep []*object.Endpoints) {
os, err := c.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
e, ok := o.(*object.Endpoints)
if !ok {
continue
}
ep = append(ep, e)
}
return ep
}
func serviceImportListFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
return c.ServiceImports(ns).List(ctx, opts)
}
}
func serviceImportWatchFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
return func(opts meta.ListOptions) (watch.Interface, error) {
return c.ServiceImports(ns).Watch(ctx, opts)
}
}
func namespaceListFunc(ctx context.Context, c kubernetes.Interface) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
return c.CoreV1().Namespaces().List(ctx, opts)
}
}
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface) func(options meta.ListOptions) (watch.Interface, error) {
return func(opts meta.ListOptions) (watch.Interface, error) {
return c.CoreV1().Namespaces().Watch(ctx, opts)
}
}
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
opts.LabelSelector = mcs.LabelServiceName // only slices created by MCS controller
return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
}
}
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
return func(opts meta.ListOptions) (watch.Interface, error) {
opts.LabelSelector = mcs.LabelServiceName // only slices created by MCS controller
return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, opts)
}
}
// GetNamespaceByName returns the namespace by name. If nothing is found an error is returned.
func (c *control) GetNamespaceByName(name string) (*k8sObject.Namespace, error) {
o, exists, err := c.nsLister.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("namespace not found")
}
ns, ok := o.(*k8sObject.Namespace)
if !ok {
return nil, fmt.Errorf("found key but not namespace")
}
return ns, nil
}
func (c *control) Add(obj interface{}) { c.updateModified() }
func (c *control) Delete(obj interface{}) { c.updateModified() }
func (c *control) Update(oldObj, newObj interface{}) { c.detectChanges(oldObj, newObj) }
// detectChanges detects changes in objects, and updates the modified timestamp
func (c *control) detectChanges(oldObj, newObj interface{}) {
// If both objects have the same resource version, they are identical.
if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) {
return
}
obj := newObj
if obj == nil {
obj = oldObj
}
switch ob := obj.(type) {
case *object.ServiceImport:
c.updateModified()
case *object.Endpoints:
if !endpointsEquivalent(oldObj.(*object.Endpoints), newObj.(*object.Endpoints)) {
c.updateModified()
}
default:
log.Warningf("Updates for %T not supported.", ob)
}
}
// endpointsEquivalent checks if the update to an endpoint is something
// that matters to us or if they are effectively equivalent.
func endpointsEquivalent(a, b *object.Endpoints) bool {
if a == nil || b == nil {
return false
}
if len(a.Subsets) != len(b.Subsets) {
return false
}
if a.ClusterId != b.ClusterId {
return false
}
// we should be able to rely on
// these being sorted and able to be compared
// they are supposed to be in a canonical format
for i, sa := range a.Subsets {
sb := b.Subsets[i]
if !subsetsEquivalent(sa, sb) {
return false
}
}
return true
}
// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
// I.e. that they have the same ready addresses, host names, ports (including protocol
// and service names for SRV)
func subsetsEquivalent(sa, sb k8sObject.EndpointSubset) bool {
if len(sa.Addresses) != len(sb.Addresses) {
return false
}
if len(sa.Ports) != len(sb.Ports) {
return false
}
// in Addresses and Ports, we should be able to rely on
// these being sorted and able to be compared
// they are supposed to be in a canonical format
for addr, aaddr := range sa.Addresses {
baddr := sb.Addresses[addr]
if aaddr.IP != baddr.IP {
return false
}
if aaddr.Hostname != baddr.Hostname {
return false
}
}
for port, aport := range sa.Ports {
bport := sb.Ports[port]
if aport.Name != bport.Name {
return false
}
if aport.Port != bport.Port {
return false
}
if aport.Protocol != bport.Protocol {
return false
}
}
return true
}
func (c *control) Modified() int64 {
unix := atomic.LoadInt64(&c.modified)
return unix
}
func (c *control) updateModified() {
unix := time.Now().Unix()
atomic.StoreInt64(&c.modified, unix)
}
func svcNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
s, ok := obj.(*object.ServiceImport)
if !ok {
return nil, errors.New("obj was not of the correct type")
}
return []string{s.Index}, nil
}
func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
s, ok := obj.(*object.Endpoints)
if !ok {
return nil, errors.New("obj was not of the correct type")
}
return []string{s.Index}, nil
}