mirror of https://github.com/linkerd/linkerd2.git
				
				
				
			
		
			
				
	
	
		
			287 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			287 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
| package watcher
 | |
| 
 | |
| import (
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/linkerd/linkerd2/controller/k8s"
 | |
| 	labels "github.com/linkerd/linkerd2/pkg/k8s"
 | |
| 	"github.com/linkerd/linkerd2/pkg/util"
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"github.com/prometheus/client_golang/prometheus/promauto"
 | |
| 	logging "github.com/sirupsen/logrus"
 | |
| 	corev1 "k8s.io/api/core/v1"
 | |
| 	"k8s.io/client-go/tools/cache"
 | |
| )
 | |
| 
 | |
| type (
 | |
| 	// OpaquePortsWatcher watches all the services in the cluster. If the
 | |
| 	// opaque ports annotation is added to a service, the watcher will update
 | |
| 	// listeners—if any—subscribed to that service.
 | |
| 	OpaquePortsWatcher struct {
 | |
| 		subscriptions      map[ServiceID]*svcSubscriptions
 | |
| 		k8sAPI             *k8s.API
 | |
| 		subscribersGauge   *prometheus.GaugeVec
 | |
| 		log                *logging.Entry
 | |
| 		defaultOpaquePorts map[uint32]struct{}
 | |
| 		sync.RWMutex
 | |
| 	}
 | |
| 
 | |
| 	svcSubscriptions struct {
 | |
| 		opaquePorts map[uint32]struct{}
 | |
| 		listeners   []OpaquePortsUpdateListener
 | |
| 	}
 | |
| 
 | |
| 	// OpaquePortsUpdateListener is the interface that subscribers must implement.
 | |
| 	OpaquePortsUpdateListener interface {
 | |
| 		UpdateService(ports map[uint32]struct{})
 | |
| 	}
 | |
| )
 | |
| 
 | |
| var opaquePortsMetrics = promauto.NewGaugeVec(
 | |
| 	prometheus.GaugeOpts{
 | |
| 		Name: "service_subscribers",
 | |
| 		Help: "Number of subscribers to Service changes.",
 | |
| 	},
 | |
| 	[]string{"namespace", "name"},
 | |
| )
 | |
| 
 | |
| // NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for
 | |
| // k8sAPI for service changes.
 | |
| func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) {
 | |
| 	opw := &OpaquePortsWatcher{
 | |
| 		subscriptions:      make(map[ServiceID]*svcSubscriptions),
 | |
| 		k8sAPI:             k8sAPI,
 | |
| 		subscribersGauge:   opaquePortsMetrics,
 | |
| 		log:                log.WithField("component", "opaque-ports-watcher"),
 | |
| 		defaultOpaquePorts: opaquePorts,
 | |
| 	}
 | |
| 	_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 | |
| 		AddFunc:    opw.addService,
 | |
| 		DeleteFunc: opw.deleteService,
 | |
| 		UpdateFunc: opw.updateService,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return opw, nil
 | |
| }
 | |
| 
 | |
| // Subscribe subscribes a listener to a service; each time the service
 | |
| // changes, the listener will be updated if the list of opaque ports
 | |
| // changes.
 | |
| func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdateListener) error {
 | |
| 	opw.Lock()
 | |
| 	defer opw.Unlock()
 | |
| 	svc, _ := opw.k8sAPI.Svc().Lister().Services(id.Namespace).Get(id.Name)
 | |
| 	if svc != nil && svc.Spec.Type == corev1.ServiceTypeExternalName {
 | |
| 		return invalidService(id.String())
 | |
| 	}
 | |
| 	opw.log.Debugf("Starting watch on service %s", id)
 | |
| 	var numListeners float64
 | |
| 	ss, ok := opw.subscriptions[id]
 | |
| 	if !ok {
 | |
| 		// If there is no watched service, create a subscription for the service
 | |
| 		// and no opaque ports
 | |
| 		opw.subscriptions[id] = &svcSubscriptions{
 | |
| 			opaquePorts: opw.defaultOpaquePorts,
 | |
| 			listeners:   []OpaquePortsUpdateListener{listener},
 | |
| 		}
 | |
| 		numListeners = 1
 | |
| 	} else {
 | |
| 		// There are subscriptions for this service, so add the listener to the
 | |
| 		// service listeners. If there are opaque ports for the service, update
 | |
| 		// the listener with that value.
 | |
| 		ss.listeners = append(ss.listeners, listener)
 | |
| 		listener.UpdateService(ss.opaquePorts)
 | |
| 		numListeners = float64(len(ss.listeners))
 | |
| 	}
 | |
| 
 | |
| 	opw.subscribersGauge.With(id.Labels()).Set(numListeners)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Unsubscribe unsubscribes a listener from service.
 | |
| func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpdateListener) {
 | |
| 	opw.Lock()
 | |
| 	defer opw.Unlock()
 | |
| 	opw.log.Debugf("Stopping watch on service %s", id)
 | |
| 	ss, ok := opw.subscriptions[id]
 | |
| 	if !ok {
 | |
| 		opw.log.Errorf("Cannot unsubscribe from unknown service %s", id)
 | |
| 		return
 | |
| 	}
 | |
| 	for i, l := range ss.listeners {
 | |
| 		if l == listener {
 | |
| 			n := len(ss.listeners)
 | |
| 			ss.listeners[i] = ss.listeners[n-1]
 | |
| 			ss.listeners[n-1] = nil
 | |
| 			ss.listeners = ss.listeners[:n-1]
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	labels := id.Labels()
 | |
| 	if len(ss.listeners) > 0 {
 | |
| 		opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners)))
 | |
| 	} else {
 | |
| 		if !opw.subscribersGauge.Delete(labels) {
 | |
| 			opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels)
 | |
| 		}
 | |
| 		delete(opw.subscriptions, id)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (opw *OpaquePortsWatcher) updateService(oldObj interface{}, newObj interface{}) {
 | |
| 	newSvc := newObj.(*corev1.Service)
 | |
| 	oldSvc := oldObj.(*corev1.Service)
 | |
| 
 | |
| 	oldUpdated := latestUpdated(oldSvc.ManagedFields)
 | |
| 	updated := latestUpdated(newSvc.ManagedFields)
 | |
| 	if !updated.IsZero() && updated != oldUpdated {
 | |
| 		delta := time.Since(updated)
 | |
| 		serviceInformerLag.Observe(delta.Seconds())
 | |
| 	}
 | |
| 	opw.addService(newObj)
 | |
| }
 | |
| 
 | |
| func (opw *OpaquePortsWatcher) addService(obj interface{}) {
 | |
| 	opw.Lock()
 | |
| 	defer opw.Unlock()
 | |
| 	svc := obj.(*corev1.Service)
 | |
| 	id := ServiceID{
 | |
| 		Namespace: svc.Namespace,
 | |
| 		Name:      svc.Name,
 | |
| 	}
 | |
| 	opaquePorts, ok, err := getServiceOpaquePortsAnnotation(svc)
 | |
| 	if err != nil {
 | |
| 		opw.log.Errorf("failed to get %s service opaque ports annotation: %s", id, err)
 | |
| 		return
 | |
| 	}
 | |
| 	// If the opaque ports annotation was not set, then set the service's
 | |
| 	// opaque ports to the default value.
 | |
| 	if !ok {
 | |
| 		opaquePorts = opw.defaultOpaquePorts
 | |
| 	}
 | |
| 	ss, ok := opw.subscriptions[id]
 | |
| 	// If there are no subscriptions for this service, create one with the
 | |
| 	// opaque ports.
 | |
| 	if !ok {
 | |
| 		opw.subscriptions[id] = &svcSubscriptions{
 | |
| 			opaquePorts: opaquePorts,
 | |
| 			listeners:   []OpaquePortsUpdateListener{},
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 	// Do not send updates if there was no change in the opaque ports; if
 | |
| 	// there was, send an update to each listener.
 | |
| 	if portsEqual(ss.opaquePorts, opaquePorts) {
 | |
| 		return
 | |
| 	}
 | |
| 	ss.opaquePorts = opaquePorts
 | |
| 	for _, listener := range ss.listeners {
 | |
| 		listener.UpdateService(ss.opaquePorts)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (opw *OpaquePortsWatcher) deleteService(obj interface{}) {
 | |
| 	opw.Lock()
 | |
| 	defer opw.Unlock()
 | |
| 	service, ok := obj.(*corev1.Service)
 | |
| 	if !ok {
 | |
| 		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 | |
| 		if !ok {
 | |
| 			opw.log.Errorf("could not get object from DeletedFinalStateUnknown %#v", obj)
 | |
| 			return
 | |
| 		}
 | |
| 		service, ok = tombstone.Obj.(*corev1.Service)
 | |
| 		if !ok {
 | |
| 			opw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	id := ServiceID{
 | |
| 		Namespace: service.Namespace,
 | |
| 		Name:      service.Name,
 | |
| 	}
 | |
| 	ss, ok := opw.subscriptions[id]
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 	old := ss.opaquePorts
 | |
| 	ss.opaquePorts = opw.defaultOpaquePorts
 | |
| 	// Do not send an update if the service already had the default opaque ports
 | |
| 	if portsEqual(old, ss.opaquePorts) {
 | |
| 		return
 | |
| 	}
 | |
| 	for _, listener := range ss.listeners {
 | |
| 		listener.UpdateService(ss.opaquePorts)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func getServiceOpaquePortsAnnotation(svc *corev1.Service) (map[uint32]struct{}, bool, error) {
 | |
| 	annotation, ok := svc.Annotations[labels.ProxyOpaquePortsAnnotation]
 | |
| 	if !ok {
 | |
| 		return nil, false, nil
 | |
| 	}
 | |
| 	opaquePorts := make(map[uint32]struct{})
 | |
| 	if annotation != "" {
 | |
| 		for _, portStr := range parseServiceOpaquePorts(annotation, svc.Spec.Ports) {
 | |
| 			port, err := strconv.ParseUint(portStr, 10, 32)
 | |
| 			if err != nil {
 | |
| 				return nil, true, err
 | |
| 			}
 | |
| 			opaquePorts[uint32(port)] = struct{}{}
 | |
| 		}
 | |
| 	}
 | |
| 	return opaquePorts, true, nil
 | |
| }
 | |
| 
 | |
| func parseServiceOpaquePorts(annotation string, sps []corev1.ServicePort) []string {
 | |
| 	portRanges := util.GetPortRanges(annotation)
 | |
| 	var values []string
 | |
| 	for _, pr := range portRanges {
 | |
| 		port, named := isNamed(pr, sps)
 | |
| 		if named {
 | |
| 			values = append(values, strconv.Itoa(int(port)))
 | |
| 		} else {
 | |
| 			pr, err := util.ParsePortRange(pr)
 | |
| 			if err != nil {
 | |
| 				logging.Warnf("Invalid port range [%v]: %s", pr, err)
 | |
| 				continue
 | |
| 			}
 | |
| 			for i := pr.LowerBound; i <= pr.UpperBound; i++ {
 | |
| 				values = append(values, strconv.Itoa(i))
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return values
 | |
| }
 | |
| 
 | |
| // isNamed checks if a port range is actually a service named port (e.g.
 | |
| // `123-456` is a valid name, but also is a valid range); all port names must
 | |
| // be checked before making it a list.
 | |
| func isNamed(pr string, sps []corev1.ServicePort) (int32, bool) {
 | |
| 	for _, sp := range sps {
 | |
| 		if sp.Name == pr {
 | |
| 			return sp.Port, true
 | |
| 		}
 | |
| 	}
 | |
| 	return 0, false
 | |
| }
 | |
| 
 | |
| func portsEqual(x, y map[uint32]struct{}) bool {
 | |
| 	if len(x) != len(y) {
 | |
| 		return false
 | |
| 	}
 | |
| 	for port := range x {
 | |
| 		_, ok := y[port]
 | |
| 		if !ok {
 | |
| 			return false
 | |
| 		}
 | |
| 	}
 | |
| 	return true
 | |
| }
 |