package destination import ( "fmt" "slices" "strings" "sync" pb "github.com/linkerd/linkerd2-proxy-api/go/destination" "github.com/linkerd/linkerd2/controller/api/destination/watcher" "github.com/linkerd/linkerd2/controller/k8s" labels "github.com/linkerd/linkerd2/pkg/k8s" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" ) // FederatedServiceWatcher watches federated services for the local discovery // and remote discovery annotations and subscribes to the approprite local and // remote services. type federatedServiceWatcher struct { services map[watcher.ServiceID]*federatedService k8sAPI *k8s.API metadataAPI *k8s.MetadataAPI config *Config clusterStore *watcher.ClusterStore localEndpoints *watcher.EndpointsWatcher log *logging.Entry sync.RWMutex } type remoteDiscoveryID struct { cluster string service watcher.ServiceID } // FederatedService represents a federated service and it may have a local // discovery target and remote discovery targets. This struct holds a list of // subsribers that are subscribed to the federated service. type federatedService struct { namespace string localDiscovery string remoteDiscovery []remoteDiscoveryID subscribers []federatedServiceSubscriber metadataAPI *k8s.MetadataAPI config *Config localEndpoints *watcher.EndpointsWatcher clusterStore *watcher.ClusterStore log *logging.Entry sync.Mutex } // FederatedServiceSubscriber holds all the state for an individual subscriber // stream to a federated service. type federatedServiceSubscriber struct { port uint32 nodeName string instanceID string localTranslators map[string]*endpointTranslator remoteTranslators map[remoteDiscoveryID]*endpointTranslator stream *synchronizedGetStream endStream chan struct{} } func newFederatedServiceWatcher( k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, config *Config, clusterStore *watcher.ClusterStore, localEndpoints *watcher.EndpointsWatcher, log *logging.Entry, ) (*federatedServiceWatcher, error) { fsw := &federatedServiceWatcher{ services: make(map[watcher.ServiceID]*federatedService), k8sAPI: k8sAPI, metadataAPI: metadataAPI, config: config, clusterStore: clusterStore, localEndpoints: localEndpoints, log: log.WithFields(logging.Fields{ "component": "federated-service-watcher", }), } var err error _, err = k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: fsw.addService, DeleteFunc: fsw.deleteService, UpdateFunc: fsw.updateService, }) if err != nil { return nil, err } return fsw, nil } func (fsw *federatedServiceWatcher) Subscribe( service string, namespace string, port uint32, nodeName string, instanceID string, stream pb.Destination_GetServer, endStream chan struct{}, ) error { id := watcher.ServiceID{Namespace: namespace, Name: service} fsw.RLock() if federatedService, ok := fsw.services[id]; ok { fsw.RUnlock() fsw.log.Debugf("Subscribing to federated service %s/%s", namespace, service) federatedService.subscribe(port, nodeName, instanceID, stream, endStream) return nil } else { fsw.RUnlock() } return fmt.Errorf("service %s/%s is not a federated service", namespace, service) } func (fsw *federatedServiceWatcher) Unsubscribe( service string, namespace string, stream pb.Destination_GetServer, ) { id := watcher.ServiceID{Namespace: namespace, Name: service} fsw.RLock() if federatedService, ok := fsw.services[id]; ok { fsw.RUnlock() fsw.log.Debugf("Unsubscribing from federated service %s/%s", namespace, service) federatedService.unsubscribe(stream) } else { fsw.RUnlock() } } func (fsw *federatedServiceWatcher) addService(obj interface{}) { service := obj.(*corev1.Service) id := watcher.ServiceID{ Namespace: service.Namespace, Name: service.Name, } if isFederatedService(service) { fsw.Lock() if federatedService, ok := fsw.services[id]; ok { fsw.Unlock() fsw.log.Debugf("Updating federated service %s/%s", service.Namespace, service.Name) federatedService.update(service) } else { fsw.log.Debugf("Adding federated service %s/%s", service.Namespace, service.Name) federatedService = fsw.newFederatedService(service) fsw.services[id] = federatedService fsw.Unlock() federatedService.update(service) } } else { fsw.Lock() if federatedService, ok := fsw.services[id]; ok { delete(fsw.services, id) fsw.Unlock() fsw.log.Debugf("Service %s/%s is no longer a federated service", service.Namespace, service.Name) federatedService.delete() } else { fsw.Unlock() } } } func (fsw *federatedServiceWatcher) updateService(oldObj interface{}, newObj interface{}) { fsw.addService(newObj) } func (fsw *federatedServiceWatcher) deleteService(obj interface{}) { service, ok := obj.(*corev1.Service) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { fsw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj) return } service, ok = tombstone.Obj.(*corev1.Service) if !ok { fsw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj) return } } id := watcher.ServiceID{ Namespace: service.Namespace, Name: service.Name, } fsw.Lock() if federatedService, ok := fsw.services[id]; ok { delete(fsw.services, id) fsw.Unlock() federatedService.delete() } else { fsw.Unlock() } } func (fsw *federatedServiceWatcher) newFederatedService(service *corev1.Service) *federatedService { return &federatedService{ namespace: service.Namespace, localDiscovery: service.Annotations[labels.LocalDiscoveryAnnotation], remoteDiscovery: remoteDiscoveryIDs(service, fsw.log), subscribers: []federatedServiceSubscriber{}, metadataAPI: fsw.metadataAPI, config: fsw.config, localEndpoints: fsw.localEndpoints, clusterStore: fsw.clusterStore, log: fsw.log.WithFields(logging.Fields{"service": service.Name, "namespace": service.Namespace}), } } func (fs *federatedService) update(service *corev1.Service) { fs.Lock() defer fs.Unlock() newRemoteDiscovery := remoteDiscoveryIDs(service, fs.log) for _, id := range newRemoteDiscovery { if !slices.Contains(fs.remoteDiscovery, id) { for i := range fs.subscribers { fs.remoteDiscoverySubscribe(&fs.subscribers[i], id) } } } for _, id := range fs.remoteDiscovery { if !slices.Contains(newRemoteDiscovery, id) { for i := range fs.subscribers { fs.remoteDiscoveryUnsubscribe(&fs.subscribers[i], id) } } } fs.remoteDiscovery = newRemoteDiscovery newLocalDiscovery := service.Annotations[labels.LocalDiscoveryAnnotation] if fs.localDiscovery != service.Annotations[labels.LocalDiscoveryAnnotation] { if newLocalDiscovery != "" { for i := range fs.subscribers { if fs.localDiscovery != "" { fs.localDiscoveryUnsubscribe(&fs.subscribers[i], fs.localDiscovery) } fs.localDiscoverySubscribe(&fs.subscribers[i], newLocalDiscovery) } } else { for i := range fs.subscribers { fs.localDiscoveryUnsubscribe(&fs.subscribers[i], fs.localDiscovery) } } } fs.localDiscovery = newLocalDiscovery } func (fs *federatedService) delete() { fs.Lock() defer fs.Unlock() for _, subscriber := range fs.subscribers { for id, translator := range subscriber.remoteTranslators { remoteWatcher, _, found := fs.clusterStore.Get(id.cluster) if !found { fs.log.Errorf("Failed to get remote cluster %s", id.cluster) continue } remoteWatcher.Unsubscribe(id.service, subscriber.port, subscriber.instanceID, translator) translator.Stop() } for localDiscovery, translator := range subscriber.localTranslators { fs.localEndpoints.Unsubscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator) translator.Stop() } close(subscriber.endStream) } } func (fs *federatedService) subscribe( port uint32, nodeName string, instanceID string, stream pb.Destination_GetServer, endStream chan struct{}, ) { fs.Lock() defer fs.Unlock() syncStream := newSyncronizedGetStream(stream, fs.log) syncStream.Start() subscriber := federatedServiceSubscriber{ stream: syncStream, endStream: endStream, remoteTranslators: make(map[remoteDiscoveryID]*endpointTranslator, 0), localTranslators: make(map[string]*endpointTranslator, 0), port: port, nodeName: nodeName, instanceID: instanceID, } for _, id := range fs.remoteDiscovery { fs.remoteDiscoverySubscribe(&subscriber, id) } if fs.localDiscovery != "" { fs.localDiscoverySubscribe(&subscriber, fs.localDiscovery) } fs.subscribers = append(fs.subscribers, subscriber) } func (fs *federatedService) unsubscribe( stream pb.Destination_GetServer, ) { fs.Lock() defer fs.Unlock() subscribers := make([]federatedServiceSubscriber, 0) for i, subscriber := range fs.subscribers { if subscriber.stream.inner == stream { for id := range subscriber.remoteTranslators { fs.remoteDiscoveryUnsubscribe(&fs.subscribers[i], id) } for localDiscovery := range subscriber.localTranslators { fs.localDiscoveryUnsubscribe(&fs.subscribers[i], localDiscovery) } subscriber.stream.Stop() } else { subscribers = append(subscribers, subscriber) } } fs.subscribers = subscribers } func (fs *federatedService) remoteDiscoverySubscribe( subscriber *federatedServiceSubscriber, id remoteDiscoveryID, ) { remoteWatcher, remoteConfig, found := fs.clusterStore.Get(id.cluster) if !found { fs.log.Errorf("Failed to get remote cluster %s", id.cluster) return } translator := newEndpointTranslator( fs.config.ControllerNS, remoteConfig.TrustDomain, fs.config.ForceOpaqueTransport, fs.config.EnableH2Upgrade, false, // Disable endpoint filtering for remote discovery. fs.config.EnableIPv6, fs.config.ExtEndpointZoneWeights, fs.config.MeshedHttp2ClientParams, fmt.Sprintf("%s.%s.svc.%s:%d", id.service, fs.namespace, remoteConfig.ClusterDomain, subscriber.port), subscriber.nodeName, fs.config.DefaultOpaquePorts, fs.metadataAPI, subscriber.stream, subscriber.endStream, fs.log, ) translator.Start() subscriber.remoteTranslators[id] = translator fs.log.Debugf("Subscribing to remote discovery service %s in cluster %s", id.service, id.cluster) err := remoteWatcher.Subscribe(watcher.ServiceID{Namespace: id.service.Namespace, Name: id.service.Name}, subscriber.port, subscriber.instanceID, translator) if err != nil { fs.log.Errorf("Failed to subscribe to remote disocvery service %q in cluster %s: %s", id.service.Name, id.cluster, err) } } func (fs *federatedService) remoteDiscoveryUnsubscribe( subscriber *federatedServiceSubscriber, id remoteDiscoveryID, ) { remoteWatcher, _, found := fs.clusterStore.Get(id.cluster) if !found { fs.log.Errorf("Failed to get remote cluster %s", id.cluster) return } translator := subscriber.remoteTranslators[id] fs.log.Debugf("Unsubscribing from remote discovery service %s in cluster %s", id.service, id.cluster) remoteWatcher.Unsubscribe(id.service, subscriber.port, subscriber.instanceID, translator) translator.NoEndpoints(true) translator.DrainAndStop() delete(subscriber.remoteTranslators, id) } func (fs *federatedService) localDiscoverySubscribe( subscriber *federatedServiceSubscriber, localDiscovery string, ) { translator := newEndpointTranslator( fs.config.ControllerNS, fs.config.IdentityTrustDomain, fs.config.ForceOpaqueTransport, fs.config.EnableH2Upgrade, true, fs.config.EnableIPv6, fs.config.ExtEndpointZoneWeights, fs.config.MeshedHttp2ClientParams, localDiscovery, subscriber.nodeName, fs.config.DefaultOpaquePorts, fs.metadataAPI, subscriber.stream, subscriber.endStream, fs.log, ) translator.Start() subscriber.localTranslators[localDiscovery] = translator fs.log.Debugf("Subscribing to local discovery service %s", localDiscovery) err := fs.localEndpoints.Subscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator) if err != nil { fs.log.Errorf("Failed to subscribe to %s: %s", localDiscovery, err) } } func (fs *federatedService) localDiscoveryUnsubscribe( subscriber *federatedServiceSubscriber, localDiscovery string, ) { translator, found := subscriber.localTranslators[localDiscovery] if found { fs.log.Debugf("Unsubscribing to local discovery service %s", localDiscovery) fs.localEndpoints.Unsubscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator) translator.NoEndpoints(true) translator.DrainAndStop() delete(subscriber.localTranslators, localDiscovery) } } func remoteDiscoveryIDs(service *corev1.Service, log *logging.Entry) []remoteDiscoveryID { remoteDiscovery, remoteDiscoveryFound := service.Annotations[labels.RemoteDiscoveryAnnotation] if !remoteDiscoveryFound { return nil } remotes := strings.Split(remoteDiscovery, ",") ids := make([]remoteDiscoveryID, 0) for _, remote := range remotes { parts := strings.Split(remote, "@") if len(parts) != 2 { log.Errorf("Invalid remote discovery service '%s'", remote) continue } remoteSvc := parts[0] cluster := parts[1] ids = append(ids, remoteDiscoveryID{ cluster: cluster, service: watcher.ServiceID{ Namespace: service.Namespace, Name: remoteSvc, }, }) } return ids } func isFederatedService(service *corev1.Service) bool { _, localDiscoveryFound := service.Annotations[labels.LocalDiscoveryAnnotation] _, remoteDiscoveryFound := service.Annotations[labels.RemoteDiscoveryAnnotation] return localDiscoveryFound || remoteDiscoveryFound }