Add prometheus metrics for watchers (#3022)

To give better visibility into the inner workings of the kubernetes watchers in the destination service, we add some prometheus metrics.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2019-07-08 11:50:26 -07:00 committed by GitHub
parent 9409e0669a
commit 92ddffa3c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 224 additions and 13 deletions

View File

@ -11,7 +11,7 @@ RUN (proxy=$(bin/fetch-proxy $PROXY_VERSION) && \
echo "$version" >version.txt)
## compile proxy-identity agent
FROM gcr.io/linkerd-io/go-deps:7ac58ac0 as golang
FROM gcr.io/linkerd-io/go-deps:4c8f4294 as golang
WORKDIR /go/src/github.com/linkerd/linkerd2
ENV CGO_ENABLED=0 GOOS=linux
COPY pkg/flags pkg/flags

View File

@ -574,6 +574,7 @@
"api/prometheus/v1",
"prometheus",
"prometheus/internal",
"prometheus/promauto",
"prometheus/promhttp",
]
pruneopts = ""
@ -1323,6 +1324,7 @@
"github.com/prometheus/client_golang/api",
"github.com/prometheus/client_golang/api/prometheus/v1",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promauto",
"github.com/prometheus/client_golang/prometheus/promhttp",
"github.com/prometheus/common/model",
"github.com/sergi/go-diff/diffmatchpatch",

View File

@ -1,5 +1,5 @@
## compile binaries
FROM gcr.io/linkerd-io/go-deps:7ac58ac0 as golang
FROM gcr.io/linkerd-io/go-deps:4c8f4294 as golang
WORKDIR /go/src/github.com/linkerd/linkerd2
COPY cli cli
COPY chart chart

View File

@ -1,5 +1,5 @@
## compile cni-plugin utility
FROM gcr.io/linkerd-io/go-deps:7ac58ac0 as golang
FROM gcr.io/linkerd-io/go-deps:4c8f4294 as golang
WORKDIR /go/src/github.com/linkerd/linkerd2
COPY pkg pkg
COPY controller controller

View File

@ -1,5 +1,5 @@
## compile controller services
FROM gcr.io/linkerd-io/go-deps:7ac58ac0 as golang
FROM gcr.io/linkerd-io/go-deps:4c8f4294 as golang
WORKDIR /go/src/github.com/linkerd/linkerd2
COPY controller/gen controller/gen
COPY pkg pkg

View File

@ -2,9 +2,11 @@ package watcher
import (
"fmt"
"strconv"
"sync"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/prometheus"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -69,6 +71,7 @@ type (
exists bool
pods PodSet
listeners []EndpointUpdateListener
metrics endpointsMetrics
}
// EndpointUpdateListener is the interface that subscribers must implement.
@ -79,6 +82,8 @@ type (
}
)
var endpointsVecs = newEndpointsMetricsVecs()
// NewEndpointsWatcher creates an EndpointsWatcher and begins watching the
// k8sAPI for pod, service, and endpoint changes.
func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry) *EndpointsWatcher {
@ -292,8 +297,6 @@ func (sp *servicePublisher) subscribe(srcPort Port, listener EndpointUpdateListe
port.subscribe(listener)
}
// unsubscribe returns true iff the listener was found and removed.
// it also returns the number of listeners remaining after unsubscribing.
func (sp *servicePublisher) unsubscribe(srcPort Port, listener EndpointUpdateListener) {
sp.Lock()
defer sp.Unlock()
@ -301,6 +304,10 @@ func (sp *servicePublisher) unsubscribe(srcPort Port, listener EndpointUpdateLis
port, ok := sp.ports[srcPort]
if ok {
port.unsubscribe(listener)
if len(port.listeners) == 0 {
endpointsVecs.unregister(sp.metricsLabels(srcPort))
delete(sp.ports, srcPort)
}
}
}
@ -322,12 +329,15 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port) *portPublisher {
exists = true
}
log := sp.log.WithField("port", srcPort)
port := &portPublisher{
listeners: []EndpointUpdateListener{},
targetPort: targetPort,
exists: exists,
k8sAPI: sp.k8sAPI,
log: sp.log.WithField("port", srcPort),
log: log,
metrics: endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort)),
}
endpoints, err := sp.k8sAPI.Endpoint().Lister().Endpoints(sp.id.Namespace).Get(sp.id.Name)
@ -341,6 +351,10 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port) *portPublisher {
return port
}
func (sp *servicePublisher) metricsLabels(port Port) prometheus.Labels {
return endpointsLabels(sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)))
}
/////////////////////
/// portPublisher ///
/////////////////////
@ -368,6 +382,10 @@ func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) {
}
pp.exists = true
pp.pods = newPods
pp.metrics.incUpdates()
pp.metrics.setPods(len(pp.pods))
pp.metrics.setExists(true)
}
func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) PodSet {
@ -425,9 +443,14 @@ func (pp *portPublisher) updatePort(targetPort namedPort) {
func (pp *portPublisher) noEndpoints(exists bool) {
pp.exists = exists
pp.pods = make(PodSet)
for _, listener := range pp.listeners {
listener.NoEndpoints(exists)
}
pp.metrics.incUpdates()
pp.metrics.setExists(exists)
pp.metrics.setPods(0)
}
func (pp *portPublisher) subscribe(listener EndpointUpdateListener) {
@ -441,6 +464,8 @@ func (pp *portPublisher) subscribe(listener EndpointUpdateListener) {
listener.NoEndpoints(false)
}
pp.listeners = append(pp.listeners, listener)
pp.metrics.setSubscribers(len(pp.listeners))
}
func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) {
@ -450,9 +475,11 @@ func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) {
pp.listeners[i] = pp.listeners[n-1]
pp.listeners[n-1] = nil
pp.listeners = pp.listeners[:n-1]
return
break
}
}
pp.metrics.setSubscribers(len(pp.listeners))
}
////////////

View File

@ -8,6 +8,7 @@ import (
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha1"
splisters "github.com/linkerd/linkerd2/controller/gen/client/listers/serviceprofile/v1alpha1"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/prometheus"
logging "github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
@ -29,7 +30,8 @@ type (
profile *sp.ServiceProfile
listeners []ProfileUpdateListener
log *logging.Entry
log *logging.Entry
profileMetrics metrics
// All access to the profilePublisher is explicitly synchronized by this mutex.
sync.Mutex
}
@ -40,6 +42,8 @@ type (
}
)
var profileVecs = newMetricsVecs("profile", []string{"namespace", "profile"})
// NewProfileWatcher creates a ProfileWatcher and begins watching the k8sAPI for
// service profile changes.
func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) *ProfileWatcher {
@ -151,6 +155,10 @@ func (pw *ProfileWatcher) getOrNewProfilePublisher(id ProfileID, profile *sp.Ser
"ns": id.Namespace,
"profile": id.Name,
}),
profileMetrics: profileVecs.newMetrics(prometheus.Labels{
"namespace": id.Namespace,
"profile": id.Name,
}),
}
pw.profiles[id] = publisher
}
@ -175,6 +183,8 @@ func (pp *profilePublisher) subscribe(listener ProfileUpdateListener) {
pp.listeners = append(pp.listeners, listener)
listener.Update(pp.profile)
pp.profileMetrics.setSubscribers(len(pp.listeners))
}
// unsubscribe returns true if and only if the listener was found and removed.
@ -190,9 +200,11 @@ func (pp *profilePublisher) unsubscribe(listener ProfileUpdateListener) {
pp.listeners[i] = pp.listeners[n-1]
pp.listeners[n-1] = nil
pp.listeners = pp.listeners[:n-1]
return
break
}
}
pp.profileMetrics.setSubscribers(len(pp.listeners))
}
func (pp *profilePublisher) update(profile *sp.ServiceProfile) {
@ -204,6 +216,8 @@ func (pp *profilePublisher) update(profile *sp.ServiceProfile) {
for _, listener := range pp.listeners {
listener.Update(profile)
}
pp.profileMetrics.incUpdates()
}
////////////

View File

@ -0,0 +1,154 @@
package watcher
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
)
type (
metricsVecs struct {
labelNames []string
subscribers *prometheus.GaugeVec
updates *prometheus.CounterVec
}
metrics struct {
labels prometheus.Labels
subscribers prometheus.Gauge
updates prometheus.Counter
}
endpointsMetricsVecs struct {
metricsVecs
pods *prometheus.GaugeVec
exists *prometheus.GaugeVec
}
endpointsMetrics struct {
metrics
pods prometheus.Gauge
exists prometheus.Gauge
}
)
func newMetricsVecs(name string, labels []string) metricsVecs {
subscribers := promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: fmt.Sprintf("%s_subscribers", name),
Help: fmt.Sprintf("A gauge for the current number of subscribers to a %s.", name),
},
labels,
)
updates := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: fmt.Sprintf("%s_updates", name),
Help: fmt.Sprintf("A counter for number of updates to a %s.", name),
},
labels,
)
return metricsVecs{
labelNames: labels,
subscribers: subscribers,
updates: updates,
}
}
func endpointsLabels(namespace, service, port string) prometheus.Labels {
return prometheus.Labels{
"namespace": namespace,
"service": service,
"port": port,
}
}
func labelNames(labels prometheus.Labels) []string {
names := []string{}
for label := range labels {
names = append(names, label)
}
return names
}
func newEndpointsMetricsVecs() endpointsMetricsVecs {
labels := labelNames(endpointsLabels("", "", ""))
vecs := newMetricsVecs("endpoints", labels)
pods := promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "endpoints_pods",
Help: "A gauge for the current number of pods in a endpoints.",
},
labels,
)
exists := promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "endpoints_exists",
Help: "A gauge which is 1 if the endpoints exists and 0 if it does not.",
},
labels,
)
return endpointsMetricsVecs{
metricsVecs: vecs,
pods: pods,
exists: exists,
}
}
func (mv metricsVecs) newMetrics(labels prometheus.Labels) metrics {
return metrics{
labels: labels,
subscribers: mv.subscribers.With(labels),
updates: mv.updates.With(labels),
}
}
func (emv endpointsMetricsVecs) newEndpointsMetrics(labels prometheus.Labels) endpointsMetrics {
metrics := emv.newMetrics(labels)
return endpointsMetrics{
metrics: metrics,
pods: emv.pods.With(labels),
exists: emv.exists.With(labels),
}
}
func (emv endpointsMetricsVecs) unregister(labels prometheus.Labels) {
if !emv.metricsVecs.subscribers.Delete(labels) {
log.Warnf("unable to delete endpoints_subscribers metric with labels %s", labels)
}
if !emv.metricsVecs.updates.Delete(labels) {
log.Warnf("unable to delete endpoints_updates metric with labels %s", labels)
}
if !emv.pods.Delete(labels) {
log.Warnf("unable to delete endpoints_pods metric with labels %s", labels)
}
if !emv.exists.Delete(labels) {
log.Warnf("unable to delete endpoints_exists metric with labels %s", labels)
}
}
func (m metrics) setSubscribers(n int) {
m.subscribers.Set(float64(n))
}
func (m metrics) incUpdates() {
m.updates.Inc()
}
func (em endpointsMetrics) setPods(n int) {
em.pods.Set(float64(n))
}
func (em endpointsMetrics) setExists(exists bool) {
if exists {
em.exists.Set(1.0)
} else {
em.exists.Set(0.0)
}
}

View File

@ -7,6 +7,7 @@ import (
ts "github.com/deislabs/smi-sdk-go/pkg/apis/split/v1alpha1"
tslisters "github.com/deislabs/smi-sdk-go/pkg/gen/client/split/listers/split/v1alpha1"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/prometheus"
logging "github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
@ -28,7 +29,8 @@ type (
split *ts.TrafficSplit
listeners []TrafficSplitUpdateListener
log *logging.Entry
log *logging.Entry
splitMetrics metrics
// All access to the trafficSplitPublisher is explicitly synchronized by this mutex.
sync.Mutex
}
@ -39,6 +41,8 @@ type (
}
)
var splitVecs = newMetricsVecs("trafficsplit", []string{"namespace", "service"})
// NewTrafficSplitWatcher creates a TrafficSplitWatcher and begins watching the k8sAPI for
// TrafficSplit changes.
func NewTrafficSplitWatcher(k8sAPI *k8s.API, log *logging.Entry) *TrafficSplitWatcher {
@ -141,6 +145,10 @@ func (tsw *TrafficSplitWatcher) getOrNewTrafficSplitPublisher(id ServiceID, spli
"ns": id.Namespace,
"service": id.Name,
}),
splitMetrics: splitVecs.newMetrics(prometheus.Labels{
"namespace": id.Namespace,
"service": id.Name,
}),
}
tsw.publishers[id] = publisher
}
@ -165,6 +173,8 @@ func (tsp *trafficSplitPublisher) subscribe(listener TrafficSplitUpdateListener)
tsp.listeners = append(tsp.listeners, listener)
listener.UpdateTrafficSplit(tsp.split)
tsp.splitMetrics.setSubscribers(len(tsp.listeners))
}
func (tsp *trafficSplitPublisher) unsubscribe(listener TrafficSplitUpdateListener) {
@ -178,9 +188,11 @@ func (tsp *trafficSplitPublisher) unsubscribe(listener TrafficSplitUpdateListene
tsp.listeners[i] = tsp.listeners[n-1]
tsp.listeners[n-1] = nil
tsp.listeners = tsp.listeners[:n-1]
return
break
}
}
tsp.splitMetrics.setSubscribers(len(tsp.listeners))
}
func (tsp *trafficSplitPublisher) update(split *ts.TrafficSplit) {
@ -192,4 +204,6 @@ func (tsp *trafficSplitPublisher) update(split *ts.TrafficSplit) {
for _, listener := range tsp.listeners {
listener.UpdateTrafficSplit(split)
}
tsp.splitMetrics.incUpdates()
}

View File

@ -26,7 +26,7 @@ COPY web/app .
RUN $ROOT/bin/web build
## compile go server
FROM gcr.io/linkerd-io/go-deps:7ac58ac0 as golang
FROM gcr.io/linkerd-io/go-deps:4c8f4294 as golang
WORKDIR /go/src/github.com/linkerd/linkerd2
RUN mkdir -p web
COPY web/main.go web