linkerd2/controller/api/destination/watcher/opaque_ports_watcher.go

287 lines
8.0 KiB
Go

package watcher
import (
"strconv"
"sync"
"time"
"github.com/linkerd/linkerd2/controller/k8s"
labels "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
type (
// OpaquePortsWatcher watches all the services in the cluster. If the
// opaque ports annotation is added to a service, the watcher will update
// listeners—if any—subscribed to that service.
OpaquePortsWatcher struct {
subscriptions map[ServiceID]*svcSubscriptions
k8sAPI *k8s.API
subscribersGauge *prometheus.GaugeVec
log *logging.Entry
defaultOpaquePorts map[uint32]struct{}
sync.RWMutex
}
svcSubscriptions struct {
opaquePorts map[uint32]struct{}
listeners []OpaquePortsUpdateListener
}
// OpaquePortsUpdateListener is the interface that subscribers must implement.
OpaquePortsUpdateListener interface {
UpdateService(ports map[uint32]struct{})
}
)
var opaquePortsMetrics = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "service_subscribers",
Help: "Number of subscribers to Service changes.",
},
[]string{"namespace", "name"},
)
// NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for
// k8sAPI for service changes.
func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) {
opw := &OpaquePortsWatcher{
subscriptions: make(map[ServiceID]*svcSubscriptions),
k8sAPI: k8sAPI,
subscribersGauge: opaquePortsMetrics,
log: log.WithField("component", "opaque-ports-watcher"),
defaultOpaquePorts: opaquePorts,
}
_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: opw.addService,
DeleteFunc: opw.deleteService,
UpdateFunc: opw.updateService,
})
if err != nil {
return nil, err
}
return opw, nil
}
// Subscribe subscribes a listener to a service; each time the service
// changes, the listener will be updated if the list of opaque ports
// changes.
func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdateListener) error {
opw.Lock()
defer opw.Unlock()
svc, _ := opw.k8sAPI.Svc().Lister().Services(id.Namespace).Get(id.Name)
if svc != nil && svc.Spec.Type == corev1.ServiceTypeExternalName {
return invalidService(id.String())
}
opw.log.Debugf("Starting watch on service %s", id)
var numListeners float64
ss, ok := opw.subscriptions[id]
if !ok {
// If there is no watched service, create a subscription for the service
// and no opaque ports
opw.subscriptions[id] = &svcSubscriptions{
opaquePorts: opw.defaultOpaquePorts,
listeners: []OpaquePortsUpdateListener{listener},
}
numListeners = 1
} else {
// There are subscriptions for this service, so add the listener to the
// service listeners. If there are opaque ports for the service, update
// the listener with that value.
ss.listeners = append(ss.listeners, listener)
listener.UpdateService(ss.opaquePorts)
numListeners = float64(len(ss.listeners))
}
opw.subscribersGauge.With(id.Labels()).Set(numListeners)
return nil
}
// Unsubscribe unsubscribes a listener from service.
func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpdateListener) {
opw.Lock()
defer opw.Unlock()
opw.log.Debugf("Stopping watch on service %s", id)
ss, ok := opw.subscriptions[id]
if !ok {
opw.log.Errorf("Cannot unsubscribe from unknown service %s", id)
return
}
for i, l := range ss.listeners {
if l == listener {
n := len(ss.listeners)
ss.listeners[i] = ss.listeners[n-1]
ss.listeners[n-1] = nil
ss.listeners = ss.listeners[:n-1]
}
}
labels := id.Labels()
if len(ss.listeners) > 0 {
opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners)))
} else {
if !opw.subscribersGauge.Delete(labels) {
opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels)
}
delete(opw.subscriptions, id)
}
}
func (opw *OpaquePortsWatcher) updateService(oldObj interface{}, newObj interface{}) {
newSvc := newObj.(*corev1.Service)
oldSvc := oldObj.(*corev1.Service)
oldUpdated := latestUpdated(oldSvc.ManagedFields)
updated := latestUpdated(newSvc.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
serviceInformerLag.Observe(delta.Seconds())
}
opw.addService(newObj)
}
func (opw *OpaquePortsWatcher) addService(obj interface{}) {
opw.Lock()
defer opw.Unlock()
svc := obj.(*corev1.Service)
id := ServiceID{
Namespace: svc.Namespace,
Name: svc.Name,
}
opaquePorts, ok, err := getServiceOpaquePortsAnnotation(svc)
if err != nil {
opw.log.Errorf("failed to get %s service opaque ports annotation: %s", id, err)
return
}
// If the opaque ports annotation was not set, then set the service's
// opaque ports to the default value.
if !ok {
opaquePorts = opw.defaultOpaquePorts
}
ss, ok := opw.subscriptions[id]
// If there are no subscriptions for this service, create one with the
// opaque ports.
if !ok {
opw.subscriptions[id] = &svcSubscriptions{
opaquePorts: opaquePorts,
listeners: []OpaquePortsUpdateListener{},
}
return
}
// Do not send updates if there was no change in the opaque ports; if
// there was, send an update to each listener.
if portsEqual(ss.opaquePorts, opaquePorts) {
return
}
ss.opaquePorts = opaquePorts
for _, listener := range ss.listeners {
listener.UpdateService(ss.opaquePorts)
}
}
func (opw *OpaquePortsWatcher) deleteService(obj interface{}) {
opw.Lock()
defer opw.Unlock()
service, ok := obj.(*corev1.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
opw.log.Errorf("could not get object from DeletedFinalStateUnknown %#v", obj)
return
}
service, ok = tombstone.Obj.(*corev1.Service)
if !ok {
opw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
return
}
}
id := ServiceID{
Namespace: service.Namespace,
Name: service.Name,
}
ss, ok := opw.subscriptions[id]
if !ok {
return
}
old := ss.opaquePorts
ss.opaquePorts = opw.defaultOpaquePorts
// Do not send an update if the service already had the default opaque ports
if portsEqual(old, ss.opaquePorts) {
return
}
for _, listener := range ss.listeners {
listener.UpdateService(ss.opaquePorts)
}
}
func getServiceOpaquePortsAnnotation(svc *corev1.Service) (map[uint32]struct{}, bool, error) {
annotation, ok := svc.Annotations[labels.ProxyOpaquePortsAnnotation]
if !ok {
return nil, false, nil
}
opaquePorts := make(map[uint32]struct{})
if annotation != "" {
for _, portStr := range parseServiceOpaquePorts(annotation, svc.Spec.Ports) {
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return nil, true, err
}
opaquePorts[uint32(port)] = struct{}{}
}
}
return opaquePorts, true, nil
}
func parseServiceOpaquePorts(annotation string, sps []corev1.ServicePort) []string {
portRanges := util.GetPortRanges(annotation)
var values []string
for _, pr := range portRanges {
port, named := isNamed(pr, sps)
if named {
values = append(values, strconv.Itoa(int(port)))
} else {
pr, err := util.ParsePortRange(pr)
if err != nil {
logging.Warnf("Invalid port range [%v]: %s", pr, err)
continue
}
for i := pr.LowerBound; i <= pr.UpperBound; i++ {
values = append(values, strconv.Itoa(i))
}
}
}
return values
}
// isNamed checks if a port range is actually a service named port (e.g.
// `123-456` is a valid name, but also is a valid range); all port names must
// be checked before making it a list.
func isNamed(pr string, sps []corev1.ServicePort) (int32, bool) {
for _, sp := range sps {
if sp.Name == pr {
return sp.Port, true
}
}
return 0, false
}
func portsEqual(x, y map[uint32]struct{}) bool {
if len(x) != len(y) {
return false
}
for port := range x {
_, ok := y[port]
if !ok {
return false
}
}
return true
}