Add metrics to server and service watchers (#10213)

* Add metrics to server and service watchers

Closes #10202 and completes #2204

As a followup to #10201, I'm adding the following metric in `server_watcher.go`:

- `server_port_subscribers`: This tracks the number of subscribers to changes to Servers associated to a port in a pod. The metric's label identify the namespace and name of the pod, and its targeted port.

Additionally, `opaque_ports.go` was missing metrics as well. I added `service_subscribers` which tracks the number of subscribers to a given Service, labeled by the Service's namespace and name.

`opaque_ports.go` was also leaking the subscriber's map key, so that got fixed as well.
This commit is contained in:
Alejandro Pedraza 2023-02-07 08:51:09 -05:00 committed by GitHub
parent fc7d553683
commit 147c8dc07c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 20 deletions

View File

@ -35,9 +35,6 @@ const (
const endpointTargetRefPod = "Pod"
// TODO: prom metrics for all the queues/caches
// https://github.com/linkerd/linkerd2/issues/2204
type (
// Address represents an individual port on a specific endpoint.
// This endpoint might be the result of a the existence of a pod

View File

@ -5,6 +5,7 @@ import (
"net"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache"
@ -41,6 +42,11 @@ type (
}
)
// Labels returns the labels for prometheus metrics associated to the service
func (id ServiceID) Labels() prometheus.Labels {
return prometheus.Labels{"namespace": id.Namespace, "name": id.Name}
}
func (is InvalidService) Error() string {
return fmt.Sprintf("Invalid k8s service %s", is.authority)
}

View File

@ -7,6 +7,8 @@ import (
"github.com/linkerd/linkerd2/controller/k8s"
labels "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
@ -19,6 +21,7 @@ type (
OpaquePortsWatcher struct {
subscriptions map[ServiceID]*svcSubscriptions
k8sAPI *k8s.API
subscribersGauge *prometheus.GaugeVec
log *logging.Entry
defaultOpaquePorts map[uint32]struct{}
sync.RWMutex
@ -35,12 +38,21 @@ type (
}
)
var opaquePortsMetrics = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "service_subscribers",
Help: "Number of subscribers to Service changes.",
},
[]string{"namespace", "name"},
)
// NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for
// k8sAPI for service changes.
func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) {
opw := &OpaquePortsWatcher{
subscriptions: make(map[ServiceID]*svcSubscriptions),
k8sAPI: k8sAPI,
subscribersGauge: opaquePortsMetrics,
log: log.WithField("component", "opaque-ports-watcher"),
defaultOpaquePorts: opaquePorts,
}
@ -67,21 +79,27 @@ func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdat
return invalidService(id.String())
}
opw.log.Debugf("Starting watch on service %s", id)
var numListeners float64
ss, ok := opw.subscriptions[id]
// If there is no watched service, create a subscription for the service
// and no opaque ports
if !ok {
// If there is no watched service, create a subscription for the service
// and no opaque ports
opw.subscriptions[id] = &svcSubscriptions{
opaquePorts: opw.defaultOpaquePorts,
listeners: []OpaquePortsUpdateListener{listener},
}
return nil
numListeners = 1
} else {
// There are subscriptions for this service, so add the listener to the
// service listeners. If there are opaque ports for the service, update
// the listener with that value.
ss.listeners = append(ss.listeners, listener)
listener.UpdateService(ss.opaquePorts)
numListeners = float64(len(ss.listeners))
}
// There are subscriptions for this service, so add the listener to the
// service listeners. If there are opaque ports for the service, update
// the listener with that value.
ss.listeners = append(ss.listeners, listener)
listener.UpdateService(ss.opaquePorts)
opw.subscribersGauge.With(id.Labels()).Set(numListeners)
return nil
}
@ -103,6 +121,16 @@ func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpd
ss.listeners = ss.listeners[:n-1]
}
}
labels := id.Labels()
if len(ss.listeners) > 0 {
opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners)))
} else {
if !opw.subscribersGauge.Delete(labels) {
opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels)
}
delete(opw.subscriptions, id)
}
}
func (opw *OpaquePortsWatcher) addService(obj interface{}) {

View File

@ -1,10 +1,14 @@
package watcher
import (
"strconv"
"strings"
"sync"
"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -17,9 +21,10 @@ import (
// update, it only sends updates to listeners if their endpoint's protocol
// is changed by the Server.
type ServerWatcher struct {
subscriptions map[podPort][]ServerUpdateListener
k8sAPI *k8s.API
log *logging.Entry
subscriptions map[podPort][]ServerUpdateListener
k8sAPI *k8s.API
subscribersGauge *prometheus.GaugeVec
log *logging.Entry
sync.RWMutex
}
@ -36,12 +41,21 @@ type ServerUpdateListener interface {
UpdateProtocol(bool)
}
var serverMetrics = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "server_port_subscribers",
Help: "Number of subscribers to Server changes associated with a pod's port.",
},
[]string{"namespace", "name", "port"},
)
// NewServerWatcher creates a new ServerWatcher.
func NewServerWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ServerWatcher, error) {
sw := &ServerWatcher{
subscriptions: make(map[podPort][]ServerUpdateListener),
k8sAPI: k8sAPI,
log: log,
subscriptions: make(map[podPort][]ServerUpdateListener),
k8sAPI: k8sAPI,
subscribersGauge: serverMetrics,
log: log,
}
_, err := k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sw.addServer,
@ -66,11 +80,13 @@ func (sw *ServerWatcher) Subscribe(pod *corev1.Pod, port Port, listener ServerUp
}
listeners, ok := sw.subscriptions[pp]
if !ok {
sw.subscriptions[pp] = []ServerUpdateListener{listener}
return
listeners = []ServerUpdateListener{listener}
} else {
listeners = append(listeners, listener)
}
listeners = append(listeners, listener)
sw.subscriptions[pp] = listeners
sw.subscribersGauge.With(serverMetricLabels(pod, port)).Set(float64(len(listeners)))
}
// Unsubscribe unsubcribes a listener from any Server updates.
@ -95,9 +111,14 @@ func (sw *ServerWatcher) Unsubscribe(pod *corev1.Pod, port Port, listener Server
}
}
labels := serverMetricLabels(pod, port)
if len(listeners) > 0 {
sw.subscribersGauge.With(labels).Set(float64(len(listeners)))
sw.subscriptions[pp] = listeners
} else {
if !sw.subscribersGauge.Delete(labels) {
sw.log.Warnf("unable to delete server_port_subscribers metric with labels %s", labels)
}
delete(sw.subscriptions, pp)
}
}
@ -158,3 +179,12 @@ func (sw *ServerWatcher) updateServer(server *v1beta1.Server, selector labels.Se
}
}
}
func serverMetricLabels(pod *corev1.Pod, port Port) prometheus.Labels {
podName, _, _ := strings.Cut(pod.Name, "-")
return prometheus.Labels{
"namespace": pod.Namespace,
"name": podName,
"port": strconv.FormatUint(uint64(port), 10),
}
}