mirror of https://github.com/linkerd/linkerd2.git
				
				
				
			
		
			
				
	
	
		
			468 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			468 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
| package watcher
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/linkerd/linkerd2/controller/k8s"
 | |
| 	logging "github.com/sirupsen/logrus"
 | |
| 	"google.golang.org/grpc/codes"
 | |
| 	"google.golang.org/grpc/status"
 | |
| 	corev1 "k8s.io/api/core/v1"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| )
 | |
| 
 | |
| type (
 | |
| 	// IPWatcher wraps a EndpointsWatcher and allows subscriptions by
 | |
| 	// IP address.  It watches all services in the cluster to keep an index
 | |
| 	// of service by cluster IP and translates subscriptions by IP address into
 | |
| 	// subscriptions on the EndpointWatcher by service name.
 | |
| 	IPWatcher struct {
 | |
| 		publishers map[string]*serviceSubscriptions
 | |
| 		endpoints  *EndpointsWatcher
 | |
| 		k8sAPI     *k8s.API
 | |
| 
 | |
| 		log          *logging.Entry
 | |
| 		sync.RWMutex // This mutex protects modification of the map itself.
 | |
| 	}
 | |
| 
 | |
| 	serviceSubscriptions struct {
 | |
| 		clusterIP string
 | |
| 
 | |
| 		// At most one of service or pod may be non-zero.
 | |
| 		service ServiceID
 | |
| 		pod     AddressSet
 | |
| 
 | |
| 		listeners map[EndpointUpdateListener]Port
 | |
| 		endpoints *EndpointsWatcher
 | |
| 
 | |
| 		log *logging.Entry
 | |
| 		// All access to the servicePublisher and its portPublishers is explicitly synchronized by
 | |
| 		// this mutex.
 | |
| 		sync.Mutex
 | |
| 	}
 | |
| )
 | |
| 
 | |
| // WithPort sets the port field in all addresses of an address set.
 | |
| func (as AddressSet) WithPort(port Port) AddressSet {
 | |
| 	wp := AddressSet{
 | |
| 		Addresses: map[PodID]Address{},
 | |
| 		Labels:    as.Labels,
 | |
| 	}
 | |
| 	for id, addr := range as.Addresses {
 | |
| 		addr.Port = port
 | |
| 		wp.Addresses[id] = addr
 | |
| 	}
 | |
| 	return wp
 | |
| }
 | |
| 
 | |
| // NewIPWatcher creates an IPWatcher and begins watching the k8sAPI for service
 | |
| // changes.
 | |
| func NewIPWatcher(k8sAPI *k8s.API, endpoints *EndpointsWatcher, log *logging.Entry) *IPWatcher {
 | |
| 	iw := &IPWatcher{
 | |
| 		publishers: make(map[string]*serviceSubscriptions),
 | |
| 		endpoints:  endpoints,
 | |
| 		k8sAPI:     k8sAPI,
 | |
| 		log: log.WithFields(logging.Fields{
 | |
| 			"component": "ip-watcher",
 | |
| 		}),
 | |
| 	}
 | |
| 
 | |
| 	k8sAPI.Svc().Informer().AddIndexers(cache.Indexers{podIPIndex: func(obj interface{}) ([]string, error) {
 | |
| 		if svc, ok := obj.(*corev1.Service); ok {
 | |
| 			return []string{svc.Spec.ClusterIP}, nil
 | |
| 		}
 | |
| 		return []string{""}, fmt.Errorf("object is not a service")
 | |
| 	}})
 | |
| 
 | |
| 	k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc:    iw.addService,
 | |
| 		DeleteFunc: iw.deleteService,
 | |
| 		UpdateFunc: func(before interface{}, after interface{}) {
 | |
| 			iw.deleteService(before)
 | |
| 			iw.addService(after)
 | |
| 		},
 | |
| 	})
 | |
| 
 | |
| 	k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc:    iw.addPod,
 | |
| 		DeleteFunc: iw.deletePod,
 | |
| 		UpdateFunc: func(_ interface{}, obj interface{}) {
 | |
| 			iw.addPod(obj)
 | |
| 		},
 | |
| 	})
 | |
| 
 | |
| 	return iw
 | |
| }
 | |
| 
 | |
| ////////////////////////
 | |
| /// IPWatcher ///
 | |
| ////////////////////////
 | |
| 
 | |
| // Subscribe to an authority.
 | |
| // The provided listener will be updated each time the address set for the
 | |
| // given authority is changed.
 | |
| func (iw *IPWatcher) Subscribe(clusterIP string, port Port, listener EndpointUpdateListener) error {
 | |
| 	iw.log.Infof("Establishing watch on service cluster ip [%s:%d]", clusterIP, port)
 | |
| 	ss := iw.getOrNewServiceSubscriptions(clusterIP)
 | |
| 	return ss.subscribe(port, listener)
 | |
| }
 | |
| 
 | |
| // Unsubscribe removes a listener from the subscribers list for this authority.
 | |
| func (iw *IPWatcher) Unsubscribe(clusterIP string, port Port, listener EndpointUpdateListener) {
 | |
| 	iw.log.Infof("Stopping watch on service cluster ip [%s:%d]", clusterIP, port)
 | |
| 	ss, ok := iw.getServiceSubscriptions(clusterIP)
 | |
| 	if !ok {
 | |
| 		iw.log.Errorf("Cannot unsubscribe from unknown service ip [%s:%d]", clusterIP, port)
 | |
| 		return
 | |
| 	}
 | |
| 	ss.unsubscribe(port, listener)
 | |
| }
 | |
| 
 | |
| // GetSvcID returns the service that corresponds to a Cluster IP address if one
 | |
| // exists.
 | |
| func (iw *IPWatcher) GetSvcID(clusterIP string) (*ServiceID, error) {
 | |
| 	resource, err := getResource(clusterIP, iw.k8sAPI.Svc().Informer())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if svc, ok := resource.(*corev1.Service); ok {
 | |
| 		service := &ServiceID{
 | |
| 			Namespace: svc.Namespace,
 | |
| 			Name:      svc.Name,
 | |
| 		}
 | |
| 		return service, nil
 | |
| 	}
 | |
| 	// Either `clusterIP` does not map to a service that the indexer is aware
 | |
| 	// of, or it maps to a resource that is ignored
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| // GetPod returns the pod that corresponds to an IP address if one exists.
 | |
| func (iw *IPWatcher) GetPod(podIP string) (*corev1.Pod, error) {
 | |
| 	resource, err := getResource(podIP, iw.k8sAPI.Pod().Informer())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if pod, ok := resource.(*corev1.Pod); ok {
 | |
| 		return pod, nil
 | |
| 	}
 | |
| 	// Either `podIP` does not map to a pod that the indexer is aware of, or
 | |
| 	// it maps to a resource that is ignored
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| func getResource(ip string, informer cache.SharedIndexInformer) (interface{}, error) {
 | |
| 	matchingObjs, err := informer.GetIndexer().ByIndex(podIPIndex, ip)
 | |
| 	if err != nil {
 | |
| 		return nil, status.Error(codes.Unknown, err.Error())
 | |
| 	}
 | |
| 
 | |
| 	objs := make([]interface{}, 0)
 | |
| 	for _, obj := range matchingObjs {
 | |
| 		// Ignore terminated pods,
 | |
| 		// their IPs can be reused for new Running pods
 | |
| 		pod, ok := obj.(*corev1.Pod)
 | |
| 		if ok && podTerminated(pod) {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		objs = append(objs, obj)
 | |
| 	}
 | |
| 
 | |
| 	if len(objs) > 1 {
 | |
| 		return nil, status.Errorf(codes.FailedPrecondition, "IP address conflict: %v, %v", objs[0], objs[1])
 | |
| 	}
 | |
| 	if len(objs) == 1 {
 | |
| 		return objs[0], nil
 | |
| 	}
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| func podTerminated(pod *corev1.Pod) bool {
 | |
| 	phase := pod.Status.Phase
 | |
| 	return phase == corev1.PodSucceeded || phase == corev1.PodFailed
 | |
| }
 | |
| 
 | |
| func (iw *IPWatcher) addService(obj interface{}) {
 | |
| 	service := obj.(*corev1.Service)
 | |
| 	if service.Namespace == kubeSystem || service.Spec.ClusterIP == "None" {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ss := iw.getOrNewServiceSubscriptions(service.Spec.ClusterIP)
 | |
| 
 | |
| 	ss.updateService(service)
 | |
| }
 | |
| 
 | |
| func (iw *IPWatcher) deleteService(obj interface{}) {
 | |
| 	service, ok := obj.(*corev1.Service)
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			iw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
 | |
| 			return
 | |
| 		}
 | |
| 		service, ok = tombstone.Obj.(*corev1.Service)
 | |
| 		if !ok {
 | |
| 			iw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if service.Namespace == kubeSystem {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ss, ok := iw.getServiceSubscriptions(service.Spec.ClusterIP)
 | |
| 	if ok {
 | |
| 		ss.deleteService()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (iw *IPWatcher) addPod(obj interface{}) {
 | |
| 	pod := obj.(*corev1.Pod)
 | |
| 	if pod.Namespace == kubeSystem {
 | |
| 		return
 | |
| 	}
 | |
| 	if pod.Status.PodIP == "" {
 | |
| 		// Pod has not yet been assigned an IP address.
 | |
| 		return
 | |
| 	}
 | |
| 	if pod.Spec.HostNetwork {
 | |
| 		// Don't trigger updates for watches on host IP addresses.
 | |
| 		return
 | |
| 	}
 | |
| 	ss := iw.getOrNewServiceSubscriptions(pod.Status.PodIP)
 | |
| 	ss.updatePod(iw.PodToAddressSet(pod))
 | |
| }
 | |
| 
 | |
| func (iw *IPWatcher) deletePod(obj interface{}) {
 | |
| 	pod, ok := obj.(*corev1.Pod)
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			iw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
 | |
| 			return
 | |
| 		}
 | |
| 		pod, ok = tombstone.Obj.(*corev1.Pod)
 | |
| 		if !ok {
 | |
| 			iw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Pod %#v", obj)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if pod.Namespace == kubeSystem {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ss, ok := iw.getServiceSubscriptions(pod.Status.PodIP)
 | |
| 	if ok {
 | |
| 		ss.deletePod()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Returns the serviceSubscriptions for the given clusterIP if it exists.  Otherwise,
 | |
| // create a new one and return it.
 | |
| func (iw *IPWatcher) getOrNewServiceSubscriptions(clusterIP string) *serviceSubscriptions {
 | |
| 	iw.Lock()
 | |
| 	defer iw.Unlock()
 | |
| 
 | |
| 	// If the service doesn't yet exist, create a stub for it so the listener can
 | |
| 	// be registered.
 | |
| 	ss, ok := iw.publishers[clusterIP]
 | |
| 	if !ok {
 | |
| 		ss = &serviceSubscriptions{
 | |
| 			clusterIP: clusterIP,
 | |
| 			listeners: make(map[EndpointUpdateListener]Port),
 | |
| 			endpoints: iw.endpoints,
 | |
| 			log:       iw.log.WithField("clusterIP", clusterIP),
 | |
| 		}
 | |
| 
 | |
| 		objs, err := iw.k8sAPI.Svc().Informer().GetIndexer().ByIndex(podIPIndex, clusterIP)
 | |
| 		if err != nil {
 | |
| 			iw.log.Error(err)
 | |
| 		} else {
 | |
| 			if len(objs) > 1 {
 | |
| 				iw.log.Errorf("Service cluster IP conflict: %v, %v", objs[0], objs[1])
 | |
| 			}
 | |
| 			if len(objs) == 1 {
 | |
| 				if svc, ok := objs[0].(*corev1.Service); ok {
 | |
| 					ss.service = ServiceID{
 | |
| 						Namespace: svc.Namespace,
 | |
| 						Name:      svc.Name,
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		objs, err = iw.k8sAPI.Pod().Informer().GetIndexer().ByIndex(podIPIndex, clusterIP)
 | |
| 		if err != nil {
 | |
| 			iw.log.Error(err)
 | |
| 		} else {
 | |
| 			pods := []*corev1.Pod{}
 | |
| 			for _, obj := range objs {
 | |
| 				if pod, ok := obj.(*corev1.Pod); ok {
 | |
| 					// Skip pods with HostNetwork
 | |
| 					if pod.Spec.HostNetwork {
 | |
| 						continue
 | |
| 					}
 | |
| 
 | |
| 					// Ignore terminated pods,
 | |
| 					// their IPs can be reused for new Running pods
 | |
| 					if podTerminated(pod) {
 | |
| 						continue
 | |
| 					}
 | |
| 
 | |
| 					pods = append(pods, pod)
 | |
| 				}
 | |
| 			}
 | |
| 			if len(pods) > 1 {
 | |
| 				iw.log.Errorf("Pod IP conflict: %v, %v", objs[0], objs[1])
 | |
| 			}
 | |
| 			if len(pods) == 1 {
 | |
| 				ss.pod = iw.PodToAddressSet(pods[0])
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		iw.publishers[clusterIP] = ss
 | |
| 	}
 | |
| 	return ss
 | |
| }
 | |
| 
 | |
| func (iw *IPWatcher) getServiceSubscriptions(clusterIP string) (ss *serviceSubscriptions, ok bool) {
 | |
| 	iw.RLock()
 | |
| 	defer iw.RUnlock()
 | |
| 	ss, ok = iw.publishers[clusterIP]
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // PodToAddressSet converts a Pod spec into a set of Addresses.
 | |
| func (iw *IPWatcher) PodToAddressSet(pod *corev1.Pod) AddressSet {
 | |
| 	ownerKind, ownerName := iw.k8sAPI.GetOwnerKindAndName(context.Background(), pod, true)
 | |
| 	return AddressSet{
 | |
| 		Addresses: map[PodID]Address{
 | |
| 			PodID{
 | |
| 				Name:      pod.Name,
 | |
| 				Namespace: pod.Namespace,
 | |
| 			}: Address{
 | |
| 				IP:        pod.Status.PodIP,
 | |
| 				Port:      0, // Will be set by individual subscriptions
 | |
| 				Pod:       pod,
 | |
| 				OwnerName: ownerName,
 | |
| 				OwnerKind: ownerKind,
 | |
| 			},
 | |
| 		},
 | |
| 		Labels: map[string]string{"namespace": pod.Namespace},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| ////////////////////////
 | |
| /// serviceSubscriptions ///
 | |
| ////////////////////////
 | |
| 
 | |
| func (ss *serviceSubscriptions) updateService(service *corev1.Service) {
 | |
| 	ss.Lock()
 | |
| 	defer ss.Unlock()
 | |
| 
 | |
| 	id := ServiceID{
 | |
| 		Namespace: service.Namespace,
 | |
| 		Name:      service.Name,
 | |
| 	}
 | |
| 
 | |
| 	if id != ss.service {
 | |
| 		for listener, port := range ss.listeners {
 | |
| 			ss.endpoints.Unsubscribe(ss.service, port, "", listener)
 | |
| 			listener.NoEndpoints(true) // Clear out previous endpoints.
 | |
| 			err := ss.endpoints.Subscribe(id, port, "", listener)
 | |
| 			if err != nil {
 | |
| 				ss.log.Warnf("failed to subscribe to %s: %s", id, err)
 | |
| 				listener.NoEndpoints(true) // Clear out previous endpoints.
 | |
| 				listener.Add(singletonAddress(ss.clusterIP, port))
 | |
| 			}
 | |
| 		}
 | |
| 		ss.service = id
 | |
| 		ss.pod = AddressSet{}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (ss *serviceSubscriptions) deleteService() {
 | |
| 	ss.Lock()
 | |
| 	defer ss.Unlock()
 | |
| 
 | |
| 	for listener, port := range ss.listeners {
 | |
| 		ss.endpoints.Unsubscribe(ss.service, port, "", listener)
 | |
| 		listener.NoEndpoints(true) // Clear out previous endpoints.
 | |
| 		listener.Add(singletonAddress(ss.clusterIP, port))
 | |
| 
 | |
| 	}
 | |
| 	ss.service = ServiceID{}
 | |
| }
 | |
| 
 | |
| func (ss *serviceSubscriptions) updatePod(podSet AddressSet) {
 | |
| 	ss.Lock()
 | |
| 	defer ss.Unlock()
 | |
| 
 | |
| 	for listener, port := range ss.listeners {
 | |
| 		// Pod IP addresses never change. Therefore, we don't need to send a
 | |
| 		// NoEndpoints update to clear our the previous address; sending an
 | |
| 		// Add with the same address will replace the previous one.
 | |
| 		if len(podSet.Addresses) != 0 {
 | |
| 			podSetWithPort := podSet.WithPort(port)
 | |
| 			listener.Add(podSetWithPort)
 | |
| 		}
 | |
| 	}
 | |
| 	ss.pod = podSet
 | |
| }
 | |
| 
 | |
| func (ss *serviceSubscriptions) deletePod() {
 | |
| 	ss.Lock()
 | |
| 	defer ss.Unlock()
 | |
| 	for listener, port := range ss.listeners {
 | |
| 		listener.NoEndpoints(true) // Clear out previous endpoints.
 | |
| 		listener.Add(singletonAddress(ss.clusterIP, port))
 | |
| 	}
 | |
| 	ss.pod = AddressSet{}
 | |
| }
 | |
| 
 | |
| func (ss *serviceSubscriptions) subscribe(port Port, listener EndpointUpdateListener) error {
 | |
| 	ss.Lock()
 | |
| 	defer ss.Unlock()
 | |
| 
 | |
| 	if (ss.service != ServiceID{}) {
 | |
| 		err := ss.endpoints.Subscribe(ss.service, port, "", listener)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	} else if len(ss.pod.Addresses) != 0 {
 | |
| 		podSetWithPort := ss.pod.WithPort(port)
 | |
| 		listener.Add(podSetWithPort)
 | |
| 	} else {
 | |
| 		listener.Add(singletonAddress(ss.clusterIP, port))
 | |
| 	}
 | |
| 	ss.listeners[listener] = port
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (ss *serviceSubscriptions) unsubscribe(port Port, listener EndpointUpdateListener) {
 | |
| 	ss.Lock()
 | |
| 	defer ss.Unlock()
 | |
| 
 | |
| 	if (ss.service != ServiceID{}) {
 | |
| 		ss.endpoints.Unsubscribe(ss.service, port, "", listener)
 | |
| 	}
 | |
| 	delete(ss.listeners, listener)
 | |
| }
 | |
| 
 | |
| func singletonAddress(ip string, port Port) AddressSet {
 | |
| 	return AddressSet{
 | |
| 		Addresses: map[PodID]Address{
 | |
| 			PodID{}: Address{
 | |
| 				IP:   ip,
 | |
| 				Port: port,
 | |
| 			},
 | |
| 		},
 | |
| 		Labels: map[string]string{},
 | |
| 	}
 | |
| }
 |