Use Prometheus to track added data plane pods. (#338)

The instance cache that powers the ListPods API is stored in memory in the telemetry service. This means that when there are multiple replicas of the telemetry service, each replica will have a distinct, incomplete view of the added pods based on which pods report to that telemetry replica. This causes the data plane bubbles on the dashboard to not all be filled in, and to flicker with each data refresh.

We create a Prometheus counter called reports_total which has pod as a label. Whenever a telemetry service instance receives a report from a pod, it increments reports_total for that pod. This allows us to remove the in-memory instance cache and instead query Prometheus to see if each pod has had a report in the last 30 seconds.

Fixes #337

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2018-02-14 16:09:55 -08:00 committed by GitHub
parent 300fd3475b
commit 552204366c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 37 additions and 49 deletions

View File

@ -6,7 +6,6 @@ import (
"net"
"net/http"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/ptypes/duration"
@ -28,6 +27,10 @@ import (
k8sV1 "k8s.io/api/core/v1"
)
const (
reportsMetric = "reports_total"
)
var (
requestLabels = []string{"source_deployment", "target_deployment"}
requestsTotal = prometheus.NewCounterVec(
@ -63,12 +66,22 @@ var (
},
requestLabels,
)
reportsLabels = []string{"pod"}
reportsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: reportsMetric,
Help: "Total number of telemetry reports received",
},
reportsLabels,
)
)
func init() {
prometheus.MustRegister(requestsTotal)
prometheus.MustRegister(responsesTotal)
prometheus.MustRegister(responseLatency)
prometheus.MustRegister(reportsTotal)
}
type (
@ -76,52 +89,10 @@ type (
prometheusAPI v1.API
pods *k8s.PodIndex
replicaSets *k8s.ReplicaSetStore
instances instanceCache
ignoredNamespaces []string
}
instanceCache struct {
sync.RWMutex
cache map[string]time.Time
}
)
func (c *instanceCache) update(id string) {
c.Lock()
defer c.Unlock()
c.cache[id] = time.Now()
}
func (c *instanceCache) list() []string {
c.RLock()
defer c.RUnlock()
instances := make([]string, 0)
for name, _ := range c.cache {
instances = append(instances, name)
}
return instances
}
func (c *instanceCache) purgeOldInstances() {
c.Lock()
defer c.Unlock()
expiry := time.Now().Add(-10 * time.Minute)
for name, time := range c.cache {
if time.Before(expiry) {
delete(c.cache, name)
}
}
}
func cleanupOldInstances(srv *server) {
for _ = range time.Tick(10 * time.Second) {
srv.instances.purgeOldInstances()
}
}
func podIPKeyFunc(obj interface{}) ([]string, error) {
if pod, ok := obj.(*k8sV1.Pod); ok {
return []string{pod.Status.PodIP}, nil
@ -162,10 +133,8 @@ func NewServer(addr, prometheusUrl string, ignoredNamespaces []string, kubeconfi
prometheusAPI: v1.NewAPI(prometheusClient),
pods: pods,
replicaSets: replicaSets,
instances: instanceCache{cache: make(map[string]time.Time, 0)},
ignoredNamespaces: ignoredNamespaces,
}
go cleanupOldInstances(srv)
lis, err := net.Listen("tcp", addr)
if err != nil {
@ -218,7 +187,7 @@ func (s *server) Query(ctx context.Context, req *read.QueryRequest) (*read.Query
if res.Type() != model.ValMatrix {
err = fmt.Errorf("Unexpected query result type (expected Matrix): %s", res.Type())
log.Errorf("%s", err)
log.Error(err)
return nil, err
}
for _, s := range res.(model.Matrix) {
@ -236,7 +205,7 @@ func (s *server) Query(ctx context.Context, req *read.QueryRequest) (*read.Query
if res.Type() != model.ValVector {
err = fmt.Errorf("Unexpected query result type (expected Vector): %s", res.Type())
log.Errorf("%s", err)
log.Error(err)
return nil, err
}
for _, s := range res.(model.Vector) {
@ -255,6 +224,25 @@ func (s *server) ListPods(ctx context.Context, req *read.ListPodsRequest) (*publ
return nil, err
}
// Reports is a map from instance name to the absolute time of the most recent
// report from that instance.
reports := make(map[string]time.Time)
// Query Prometheus for reports in the last 30 seconds.
res, err := s.prometheusAPI.Query(ctx, reportsMetric + "[30s]", time.Time{})
if err != nil {
return nil, err
}
if res.Type() != model.ValMatrix {
err = fmt.Errorf("Unexpected query result type (expected Matrix): %s", res.Type())
log.Error(err)
return nil, err
}
for _, s := range res.(model.Matrix) {
labels := metricToMap(s.Metric)
timestamp := s.Values[len(s.Values)-1].Timestamp
reports[labels["pod"]] = time.Unix(0, int64(timestamp)*int64(time.Millisecond))
}
podList := make([]*public.Pod, 0)
for _, pod := range pods {
@ -267,7 +255,7 @@ func (s *server) ListPods(ctx context.Context, req *read.ListPodsRequest) (*publ
deployment = ""
}
name := pod.Namespace + "/" + pod.Name
updated, added := s.instances.cache[name]
updated, added := reports[name]
status := string(pod.Status.Phase)
if pod.DeletionTimestamp != nil {
@ -310,7 +298,7 @@ func (s *server) Report(ctx context.Context, req *write.ReportRequest) (*write.R
logCtx := log.WithFields(log.Fields{"id": id})
logCtx.Debugf("Received report with %d requests", len(req.Requests))
s.instances.update(id)
reportsTotal.With(prometheus.Labels{"pod": id}).Inc()
for _, requestScope := range req.Requests {
if requestScope.Ctx == nil {