diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index bc88998ae..edaaa7dcf 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -35,9 +35,6 @@ const ( const endpointTargetRefPod = "Pod" -// TODO: prom metrics for all the queues/caches -// https://github.com/linkerd/linkerd2/issues/2204 - type ( // Address represents an individual port on a specific endpoint. // This endpoint might be the result of a the existence of a pod diff --git a/controller/api/destination/watcher/k8s.go b/controller/api/destination/watcher/k8s.go index d316195ba..eb564b7d9 100644 --- a/controller/api/destination/watcher/k8s.go +++ b/controller/api/destination/watcher/k8s.go @@ -5,6 +5,7 @@ import ( "net" "github.com/linkerd/linkerd2/controller/k8s" + "github.com/prometheus/client_golang/prometheus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/cache" @@ -41,6 +42,11 @@ type ( } ) +// Labels returns the labels for prometheus metrics associated to the service +func (id ServiceID) Labels() prometheus.Labels { + return prometheus.Labels{"namespace": id.Namespace, "name": id.Name} +} + func (is InvalidService) Error() string { return fmt.Sprintf("Invalid k8s service %s", is.authority) } diff --git a/controller/api/destination/watcher/opaque_ports_watcher.go b/controller/api/destination/watcher/opaque_ports_watcher.go index afcde3095..79c4bbc77 100644 --- a/controller/api/destination/watcher/opaque_ports_watcher.go +++ b/controller/api/destination/watcher/opaque_ports_watcher.go @@ -7,6 +7,8 @@ import ( "github.com/linkerd/linkerd2/controller/k8s" labels "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" @@ -19,6 +21,7 @@ type ( OpaquePortsWatcher struct { subscriptions map[ServiceID]*svcSubscriptions k8sAPI *k8s.API + subscribersGauge *prometheus.GaugeVec log *logging.Entry defaultOpaquePorts map[uint32]struct{} sync.RWMutex @@ -35,12 +38,21 @@ type ( } ) +var opaquePortsMetrics = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "service_subscribers", + Help: "Number of subscribers to Service changes.", + }, + []string{"namespace", "name"}, +) + // NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for // k8sAPI for service changes. func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) { opw := &OpaquePortsWatcher{ subscriptions: make(map[ServiceID]*svcSubscriptions), k8sAPI: k8sAPI, + subscribersGauge: opaquePortsMetrics, log: log.WithField("component", "opaque-ports-watcher"), defaultOpaquePorts: opaquePorts, } @@ -67,21 +79,27 @@ func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdat return invalidService(id.String()) } opw.log.Debugf("Starting watch on service %s", id) + var numListeners float64 ss, ok := opw.subscriptions[id] - // If there is no watched service, create a subscription for the service - // and no opaque ports if !ok { + // If there is no watched service, create a subscription for the service + // and no opaque ports opw.subscriptions[id] = &svcSubscriptions{ opaquePorts: opw.defaultOpaquePorts, listeners: []OpaquePortsUpdateListener{listener}, } - return nil + numListeners = 1 + } else { + // There are subscriptions for this service, so add the listener to the + // service listeners. If there are opaque ports for the service, update + // the listener with that value. + ss.listeners = append(ss.listeners, listener) + listener.UpdateService(ss.opaquePorts) + numListeners = float64(len(ss.listeners)) } - // There are subscriptions for this service, so add the listener to the - // service listeners. If there are opaque ports for the service, update - // the listener with that value. - ss.listeners = append(ss.listeners, listener) - listener.UpdateService(ss.opaquePorts) + + opw.subscribersGauge.With(id.Labels()).Set(numListeners) + return nil } @@ -103,6 +121,16 @@ func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpd ss.listeners = ss.listeners[:n-1] } } + + labels := id.Labels() + if len(ss.listeners) > 0 { + opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners))) + } else { + if !opw.subscribersGauge.Delete(labels) { + opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels) + } + delete(opw.subscriptions, id) + } } func (opw *OpaquePortsWatcher) addService(obj interface{}) { diff --git a/controller/api/destination/watcher/server_watcher.go b/controller/api/destination/watcher/server_watcher.go index 91dd01fa4..5e3db7e45 100644 --- a/controller/api/destination/watcher/server_watcher.go +++ b/controller/api/destination/watcher/server_watcher.go @@ -1,10 +1,14 @@ package watcher import ( + "strconv" + "strings" "sync" "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1" "github.com/linkerd/linkerd2/controller/k8s" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -17,9 +21,10 @@ import ( // update, it only sends updates to listeners if their endpoint's protocol // is changed by the Server. type ServerWatcher struct { - subscriptions map[podPort][]ServerUpdateListener - k8sAPI *k8s.API - log *logging.Entry + subscriptions map[podPort][]ServerUpdateListener + k8sAPI *k8s.API + subscribersGauge *prometheus.GaugeVec + log *logging.Entry sync.RWMutex } @@ -36,12 +41,21 @@ type ServerUpdateListener interface { UpdateProtocol(bool) } +var serverMetrics = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "server_port_subscribers", + Help: "Number of subscribers to Server changes associated with a pod's port.", + }, + []string{"namespace", "name", "port"}, +) + // NewServerWatcher creates a new ServerWatcher. func NewServerWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ServerWatcher, error) { sw := &ServerWatcher{ - subscriptions: make(map[podPort][]ServerUpdateListener), - k8sAPI: k8sAPI, - log: log, + subscriptions: make(map[podPort][]ServerUpdateListener), + k8sAPI: k8sAPI, + subscribersGauge: serverMetrics, + log: log, } _, err := k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sw.addServer, @@ -66,11 +80,13 @@ func (sw *ServerWatcher) Subscribe(pod *corev1.Pod, port Port, listener ServerUp } listeners, ok := sw.subscriptions[pp] if !ok { - sw.subscriptions[pp] = []ServerUpdateListener{listener} - return + listeners = []ServerUpdateListener{listener} + } else { + listeners = append(listeners, listener) } - listeners = append(listeners, listener) sw.subscriptions[pp] = listeners + + sw.subscribersGauge.With(serverMetricLabels(pod, port)).Set(float64(len(listeners))) } // Unsubscribe unsubcribes a listener from any Server updates. @@ -95,9 +111,14 @@ func (sw *ServerWatcher) Unsubscribe(pod *corev1.Pod, port Port, listener Server } } + labels := serverMetricLabels(pod, port) if len(listeners) > 0 { + sw.subscribersGauge.With(labels).Set(float64(len(listeners))) sw.subscriptions[pp] = listeners } else { + if !sw.subscribersGauge.Delete(labels) { + sw.log.Warnf("unable to delete server_port_subscribers metric with labels %s", labels) + } delete(sw.subscriptions, pp) } } @@ -158,3 +179,12 @@ func (sw *ServerWatcher) updateServer(server *v1beta1.Server, selector labels.Se } } } + +func serverMetricLabels(pod *corev1.Pod, port Port) prometheus.Labels { + podName, _, _ := strings.Cut(pod.Name, "-") + return prometheus.Labels{ + "namespace": pod.Namespace, + "name": podName, + "port": strconv.FormatUint(uint64(port), 10), + } +}