mirror of https://github.com/linkerd/linkerd2.git
473 lines
14 KiB
Go
473 lines
14 KiB
Go
package destination
|
|
|
|
import (
|
|
"fmt"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
|
|
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
|
|
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
|
"github.com/linkerd/linkerd2/controller/k8s"
|
|
labels "github.com/linkerd/linkerd2/pkg/k8s"
|
|
logging "github.com/sirupsen/logrus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
// FederatedServiceWatcher watches federated services for the local discovery
|
|
// and remote discovery annotations and subscribes to the approprite local and
|
|
// remote services.
|
|
type federatedServiceWatcher struct {
|
|
services map[watcher.ServiceID]*federatedService
|
|
k8sAPI *k8s.API
|
|
metadataAPI *k8s.MetadataAPI
|
|
config *Config
|
|
clusterStore *watcher.ClusterStore
|
|
localEndpoints *watcher.EndpointsWatcher
|
|
|
|
log *logging.Entry
|
|
|
|
sync.RWMutex
|
|
}
|
|
|
|
type remoteDiscoveryID struct {
|
|
cluster string
|
|
service watcher.ServiceID
|
|
}
|
|
|
|
// FederatedService represents a federated service and it may have a local
|
|
// discovery target and remote discovery targets. This struct holds a list of
|
|
// subsribers that are subscribed to the federated service.
|
|
type federatedService struct {
|
|
namespace string
|
|
|
|
localDiscovery string
|
|
remoteDiscovery []remoteDiscoveryID
|
|
subscribers []federatedServiceSubscriber
|
|
|
|
metadataAPI *k8s.MetadataAPI
|
|
config *Config
|
|
localEndpoints *watcher.EndpointsWatcher
|
|
clusterStore *watcher.ClusterStore
|
|
log *logging.Entry
|
|
|
|
sync.Mutex
|
|
}
|
|
|
|
// FederatedServiceSubscriber holds all the state for an individual subscriber
|
|
// stream to a federated service.
|
|
type federatedServiceSubscriber struct {
|
|
port uint32
|
|
nodeName string
|
|
instanceID string
|
|
|
|
localTranslators map[string]*endpointTranslator
|
|
remoteTranslators map[remoteDiscoveryID]*endpointTranslator
|
|
|
|
stream *synchronizedGetStream
|
|
endStream chan struct{}
|
|
}
|
|
|
|
func newFederatedServiceWatcher(
|
|
k8sAPI *k8s.API,
|
|
metadataAPI *k8s.MetadataAPI,
|
|
config *Config,
|
|
clusterStore *watcher.ClusterStore,
|
|
localEndpoints *watcher.EndpointsWatcher,
|
|
log *logging.Entry,
|
|
) (*federatedServiceWatcher, error) {
|
|
fsw := &federatedServiceWatcher{
|
|
services: make(map[watcher.ServiceID]*federatedService),
|
|
k8sAPI: k8sAPI,
|
|
metadataAPI: metadataAPI,
|
|
config: config,
|
|
clusterStore: clusterStore,
|
|
localEndpoints: localEndpoints,
|
|
log: log.WithFields(logging.Fields{
|
|
"component": "federated-service-watcher",
|
|
}),
|
|
}
|
|
|
|
var err error
|
|
_, err = k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: fsw.addService,
|
|
DeleteFunc: fsw.deleteService,
|
|
UpdateFunc: fsw.updateService,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return fsw, nil
|
|
}
|
|
|
|
func (fsw *federatedServiceWatcher) Subscribe(
|
|
service string,
|
|
namespace string,
|
|
port uint32,
|
|
nodeName string,
|
|
instanceID string,
|
|
stream pb.Destination_GetServer,
|
|
endStream chan struct{},
|
|
) error {
|
|
id := watcher.ServiceID{Namespace: namespace, Name: service}
|
|
fsw.RLock()
|
|
if federatedService, ok := fsw.services[id]; ok {
|
|
fsw.RUnlock()
|
|
fsw.log.Debugf("Subscribing to federated service %s/%s", namespace, service)
|
|
federatedService.subscribe(port, nodeName, instanceID, stream, endStream)
|
|
return nil
|
|
} else {
|
|
fsw.RUnlock()
|
|
}
|
|
return fmt.Errorf("service %s/%s is not a federated service", namespace, service)
|
|
}
|
|
|
|
func (fsw *federatedServiceWatcher) Unsubscribe(
|
|
service string,
|
|
namespace string,
|
|
stream pb.Destination_GetServer,
|
|
) {
|
|
id := watcher.ServiceID{Namespace: namespace, Name: service}
|
|
fsw.RLock()
|
|
if federatedService, ok := fsw.services[id]; ok {
|
|
fsw.RUnlock()
|
|
fsw.log.Debugf("Unsubscribing from federated service %s/%s", namespace, service)
|
|
federatedService.unsubscribe(stream)
|
|
} else {
|
|
fsw.RUnlock()
|
|
}
|
|
}
|
|
|
|
func (fsw *federatedServiceWatcher) addService(obj interface{}) {
|
|
service := obj.(*corev1.Service)
|
|
id := watcher.ServiceID{
|
|
Namespace: service.Namespace,
|
|
Name: service.Name,
|
|
}
|
|
|
|
if isFederatedService(service) {
|
|
fsw.Lock()
|
|
if federatedService, ok := fsw.services[id]; ok {
|
|
fsw.Unlock()
|
|
fsw.log.Debugf("Updating federated service %s/%s", service.Namespace, service.Name)
|
|
federatedService.update(service)
|
|
} else {
|
|
fsw.log.Debugf("Adding federated service %s/%s", service.Namespace, service.Name)
|
|
federatedService = fsw.newFederatedService(service)
|
|
fsw.services[id] = federatedService
|
|
fsw.Unlock()
|
|
federatedService.update(service)
|
|
}
|
|
} else {
|
|
fsw.Lock()
|
|
if federatedService, ok := fsw.services[id]; ok {
|
|
delete(fsw.services, id)
|
|
fsw.Unlock()
|
|
fsw.log.Debugf("Service %s/%s is no longer a federated service", service.Namespace, service.Name)
|
|
federatedService.delete()
|
|
} else {
|
|
fsw.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (fsw *federatedServiceWatcher) updateService(oldObj interface{}, newObj interface{}) {
|
|
fsw.addService(newObj)
|
|
}
|
|
|
|
func (fsw *federatedServiceWatcher) deleteService(obj interface{}) {
|
|
service, ok := obj.(*corev1.Service)
|
|
if !ok {
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
if !ok {
|
|
fsw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
|
|
return
|
|
}
|
|
service, ok = tombstone.Obj.(*corev1.Service)
|
|
if !ok {
|
|
fsw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
|
|
return
|
|
}
|
|
}
|
|
|
|
id := watcher.ServiceID{
|
|
Namespace: service.Namespace,
|
|
Name: service.Name,
|
|
}
|
|
fsw.Lock()
|
|
if federatedService, ok := fsw.services[id]; ok {
|
|
delete(fsw.services, id)
|
|
fsw.Unlock()
|
|
federatedService.delete()
|
|
} else {
|
|
fsw.Unlock()
|
|
}
|
|
|
|
}
|
|
|
|
func (fsw *federatedServiceWatcher) newFederatedService(service *corev1.Service) *federatedService {
|
|
return &federatedService{
|
|
namespace: service.Namespace,
|
|
|
|
localDiscovery: service.Annotations[labels.LocalDiscoveryAnnotation],
|
|
remoteDiscovery: remoteDiscoveryIDs(service, fsw.log),
|
|
subscribers: []federatedServiceSubscriber{},
|
|
|
|
metadataAPI: fsw.metadataAPI,
|
|
config: fsw.config,
|
|
localEndpoints: fsw.localEndpoints,
|
|
clusterStore: fsw.clusterStore,
|
|
log: fsw.log.WithFields(logging.Fields{"service": service.Name, "namespace": service.Namespace}),
|
|
}
|
|
}
|
|
|
|
func (fs *federatedService) update(service *corev1.Service) {
|
|
fs.Lock()
|
|
defer fs.Unlock()
|
|
|
|
newRemoteDiscovery := remoteDiscoveryIDs(service, fs.log)
|
|
for _, id := range newRemoteDiscovery {
|
|
if !slices.Contains(fs.remoteDiscovery, id) {
|
|
for i := range fs.subscribers {
|
|
fs.remoteDiscoverySubscribe(&fs.subscribers[i], id)
|
|
}
|
|
}
|
|
}
|
|
for _, id := range fs.remoteDiscovery {
|
|
if !slices.Contains(newRemoteDiscovery, id) {
|
|
for i := range fs.subscribers {
|
|
fs.remoteDiscoveryUnsubscribe(&fs.subscribers[i], id)
|
|
}
|
|
}
|
|
}
|
|
fs.remoteDiscovery = newRemoteDiscovery
|
|
|
|
newLocalDiscovery := service.Annotations[labels.LocalDiscoveryAnnotation]
|
|
if fs.localDiscovery != service.Annotations[labels.LocalDiscoveryAnnotation] {
|
|
if newLocalDiscovery != "" {
|
|
for i := range fs.subscribers {
|
|
if fs.localDiscovery != "" {
|
|
fs.localDiscoveryUnsubscribe(&fs.subscribers[i], fs.localDiscovery)
|
|
}
|
|
fs.localDiscoverySubscribe(&fs.subscribers[i], newLocalDiscovery)
|
|
}
|
|
} else {
|
|
for i := range fs.subscribers {
|
|
fs.localDiscoveryUnsubscribe(&fs.subscribers[i], fs.localDiscovery)
|
|
}
|
|
}
|
|
}
|
|
fs.localDiscovery = newLocalDiscovery
|
|
}
|
|
|
|
func (fs *federatedService) delete() {
|
|
fs.Lock()
|
|
defer fs.Unlock()
|
|
|
|
for _, subscriber := range fs.subscribers {
|
|
for id, translator := range subscriber.remoteTranslators {
|
|
remoteWatcher, _, found := fs.clusterStore.Get(id.cluster)
|
|
if !found {
|
|
fs.log.Errorf("Failed to get remote cluster %s", id.cluster)
|
|
continue
|
|
}
|
|
remoteWatcher.Unsubscribe(id.service, subscriber.port, subscriber.instanceID, translator)
|
|
translator.Stop()
|
|
}
|
|
for localDiscovery, translator := range subscriber.localTranslators {
|
|
fs.localEndpoints.Unsubscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator)
|
|
translator.Stop()
|
|
}
|
|
close(subscriber.endStream)
|
|
}
|
|
}
|
|
|
|
func (fs *federatedService) subscribe(
|
|
port uint32,
|
|
nodeName string,
|
|
instanceID string,
|
|
stream pb.Destination_GetServer,
|
|
endStream chan struct{},
|
|
) {
|
|
fs.Lock()
|
|
defer fs.Unlock()
|
|
|
|
syncStream := newSyncronizedGetStream(stream, fs.log)
|
|
syncStream.Start()
|
|
|
|
subscriber := federatedServiceSubscriber{
|
|
stream: syncStream,
|
|
endStream: endStream,
|
|
remoteTranslators: make(map[remoteDiscoveryID]*endpointTranslator, 0),
|
|
localTranslators: make(map[string]*endpointTranslator, 0),
|
|
port: port,
|
|
nodeName: nodeName,
|
|
instanceID: instanceID,
|
|
}
|
|
for _, id := range fs.remoteDiscovery {
|
|
fs.remoteDiscoverySubscribe(&subscriber, id)
|
|
}
|
|
if fs.localDiscovery != "" {
|
|
fs.localDiscoverySubscribe(&subscriber, fs.localDiscovery)
|
|
}
|
|
|
|
fs.subscribers = append(fs.subscribers, subscriber)
|
|
}
|
|
|
|
func (fs *federatedService) unsubscribe(
|
|
stream pb.Destination_GetServer,
|
|
) {
|
|
fs.Lock()
|
|
defer fs.Unlock()
|
|
|
|
subscribers := make([]federatedServiceSubscriber, 0)
|
|
for i, subscriber := range fs.subscribers {
|
|
if subscriber.stream.inner == stream {
|
|
for id := range subscriber.remoteTranslators {
|
|
fs.remoteDiscoveryUnsubscribe(&fs.subscribers[i], id)
|
|
}
|
|
for localDiscovery := range subscriber.localTranslators {
|
|
fs.localDiscoveryUnsubscribe(&fs.subscribers[i], localDiscovery)
|
|
}
|
|
subscriber.stream.Stop()
|
|
} else {
|
|
subscribers = append(subscribers, subscriber)
|
|
}
|
|
}
|
|
fs.subscribers = subscribers
|
|
}
|
|
|
|
func (fs *federatedService) remoteDiscoverySubscribe(
|
|
subscriber *federatedServiceSubscriber,
|
|
id remoteDiscoveryID,
|
|
) {
|
|
remoteWatcher, remoteConfig, found := fs.clusterStore.Get(id.cluster)
|
|
if !found {
|
|
fs.log.Errorf("Failed to get remote cluster %s", id.cluster)
|
|
return
|
|
}
|
|
|
|
translator := newEndpointTranslator(
|
|
fs.config.ControllerNS,
|
|
remoteConfig.TrustDomain,
|
|
fs.config.ForceOpaqueTransport,
|
|
fs.config.EnableH2Upgrade,
|
|
false, // Disable endpoint filtering for remote discovery.
|
|
fs.config.EnableIPv6,
|
|
fs.config.ExtEndpointZoneWeights,
|
|
fs.config.MeshedHttp2ClientParams,
|
|
fmt.Sprintf("%s.%s.svc.%s:%d", id.service, fs.namespace, remoteConfig.ClusterDomain, subscriber.port),
|
|
subscriber.nodeName,
|
|
fs.config.DefaultOpaquePorts,
|
|
fs.metadataAPI,
|
|
subscriber.stream,
|
|
subscriber.endStream,
|
|
fs.log,
|
|
)
|
|
translator.Start()
|
|
subscriber.remoteTranslators[id] = translator
|
|
|
|
fs.log.Debugf("Subscribing to remote discovery service %s in cluster %s", id.service, id.cluster)
|
|
err := remoteWatcher.Subscribe(watcher.ServiceID{Namespace: id.service.Namespace, Name: id.service.Name}, subscriber.port, subscriber.instanceID, translator)
|
|
if err != nil {
|
|
fs.log.Errorf("Failed to subscribe to remote disocvery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
|
|
}
|
|
}
|
|
|
|
func (fs *federatedService) remoteDiscoveryUnsubscribe(
|
|
subscriber *federatedServiceSubscriber,
|
|
id remoteDiscoveryID,
|
|
) {
|
|
remoteWatcher, _, found := fs.clusterStore.Get(id.cluster)
|
|
if !found {
|
|
fs.log.Errorf("Failed to get remote cluster %s", id.cluster)
|
|
return
|
|
}
|
|
|
|
translator := subscriber.remoteTranslators[id]
|
|
fs.log.Debugf("Unsubscribing from remote discovery service %s in cluster %s", id.service, id.cluster)
|
|
remoteWatcher.Unsubscribe(id.service, subscriber.port, subscriber.instanceID, translator)
|
|
translator.NoEndpoints(true)
|
|
translator.DrainAndStop()
|
|
delete(subscriber.remoteTranslators, id)
|
|
}
|
|
|
|
func (fs *federatedService) localDiscoverySubscribe(
|
|
subscriber *federatedServiceSubscriber,
|
|
localDiscovery string,
|
|
) {
|
|
translator := newEndpointTranslator(
|
|
fs.config.ControllerNS,
|
|
fs.config.IdentityTrustDomain,
|
|
fs.config.ForceOpaqueTransport,
|
|
fs.config.EnableH2Upgrade,
|
|
true,
|
|
fs.config.EnableIPv6,
|
|
fs.config.ExtEndpointZoneWeights,
|
|
fs.config.MeshedHttp2ClientParams,
|
|
localDiscovery,
|
|
subscriber.nodeName,
|
|
fs.config.DefaultOpaquePorts,
|
|
fs.metadataAPI,
|
|
subscriber.stream,
|
|
subscriber.endStream,
|
|
fs.log,
|
|
)
|
|
translator.Start()
|
|
subscriber.localTranslators[localDiscovery] = translator
|
|
|
|
fs.log.Debugf("Subscribing to local discovery service %s", localDiscovery)
|
|
err := fs.localEndpoints.Subscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator)
|
|
if err != nil {
|
|
fs.log.Errorf("Failed to subscribe to %s: %s", localDiscovery, err)
|
|
}
|
|
}
|
|
|
|
func (fs *federatedService) localDiscoveryUnsubscribe(
|
|
subscriber *federatedServiceSubscriber,
|
|
localDiscovery string,
|
|
) {
|
|
translator, found := subscriber.localTranslators[localDiscovery]
|
|
if found {
|
|
fs.log.Debugf("Unsubscribing to local discovery service %s", localDiscovery)
|
|
fs.localEndpoints.Unsubscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator)
|
|
translator.NoEndpoints(true)
|
|
translator.DrainAndStop()
|
|
delete(subscriber.localTranslators, localDiscovery)
|
|
}
|
|
}
|
|
|
|
func remoteDiscoveryIDs(service *corev1.Service, log *logging.Entry) []remoteDiscoveryID {
|
|
remoteDiscovery, remoteDiscoveryFound := service.Annotations[labels.RemoteDiscoveryAnnotation]
|
|
if !remoteDiscoveryFound {
|
|
return nil
|
|
}
|
|
|
|
remotes := strings.Split(remoteDiscovery, ",")
|
|
ids := make([]remoteDiscoveryID, 0)
|
|
for _, remote := range remotes {
|
|
parts := strings.Split(remote, "@")
|
|
if len(parts) != 2 {
|
|
log.Errorf("Invalid remote discovery service '%s'", remote)
|
|
continue
|
|
}
|
|
remoteSvc := parts[0]
|
|
cluster := parts[1]
|
|
ids = append(ids, remoteDiscoveryID{
|
|
cluster: cluster,
|
|
service: watcher.ServiceID{
|
|
Namespace: service.Namespace,
|
|
Name: remoteSvc,
|
|
},
|
|
})
|
|
}
|
|
return ids
|
|
}
|
|
|
|
func isFederatedService(service *corev1.Service) bool {
|
|
_, localDiscoveryFound := service.Annotations[labels.LocalDiscoveryAnnotation]
|
|
_, remoteDiscoveryFound := service.Annotations[labels.RemoteDiscoveryAnnotation]
|
|
return localDiscoveryFound || remoteDiscoveryFound
|
|
}
|