Modify simulate proxy to expose prometheus metrics (#576)

The simulate-proxy script pushes metrics to the telemetry service. This PR modifies the script to expose metrics to a prometheus endpoint. This functionality creates a server that randomly generates response_total, request_totals, response_duration_ms and response_latency_ms. The server reads pod information from a k8s cluster and picks a random namespace to use for all exposed metrics.

Tested out these changes with a locally running prometheus server. I also ran the docker-compose.yml to make sure metrics were being recorded by the prometheus docker container.

fixes #498

Signed-off-by: Dennis Adjei-Baah <dennis@buoyant.io>
This commit is contained in:
Dennis Adjei-Baah 2018-03-21 16:40:12 -07:00 committed by GitHub
parent 1c9ce4d118
commit b90668a0b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 279 additions and 179 deletions

View File

@ -31,3 +31,10 @@ scrape_configs:
- job_name: 'destination' - job_name: 'destination'
static_configs: static_configs:
- targets: ['destination:9999'] - targets: ['destination:9999']
- job_name: 'conduit'
static_configs:
- targets:
- 'simulate-proxy:9000'
- 'simulate-proxy:9001'
- 'simulate-proxy:9002'

View File

@ -167,11 +167,11 @@ traffic to the docker-compose environment:
```bash ```bash
# confirm you are connected to Kubernetes # confirm you are connected to Kubernetes
kubectl version kubectl version
# simulate traffic
bin/go-run controller/script/simulate-proxy --kubeconfig ~/.kube/config --addr $DOCKER_IP:8086 --max-pods 10 --sleep 10ms
``` ```
Note that the Kubernetes cluster your system is configured to talk to must not be referenced via
`localhost` in your Kubernetes config file, as `simulate-proxy` will not be able to connect to it.
This includes Kubernetes on Docker For Mac.
### Testing ### Testing
```bash ```bash

View File

@ -1,20 +1,19 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt"
"math" "math"
"math/rand" "math/rand"
"net/http" "net/http"
"strconv" "os"
"os/signal"
"strings" "strings"
"time" "time"
"github.com/runconduit/conduit/controller/api/proxy" prom "github.com/prometheus/client_golang/prometheus"
common "github.com/runconduit/conduit/controller/gen/common" "github.com/prometheus/client_golang/prometheus/promhttp"
pb "github.com/runconduit/conduit/controller/gen/proxy/telemetry"
"github.com/runconduit/conduit/controller/k8s" "github.com/runconduit/conduit/controller/k8s"
"github.com/runconduit/conduit/controller/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -22,9 +21,27 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth" _ "k8s.io/client-go/plugin/pkg/client/auth"
) )
/* A simple script for posting simulated telemetry data to the proxy api */ /* A simple script for exposing simulated prometheus metrics */
type simulatedProxy struct {
podOwner string
deploymentName string
sleep time.Duration
namespace string
deployments []string
registerer *prom.Registry
*proxyMetricCollectors
}
type proxyMetricCollectors struct {
requestTotals *prom.CounterVec
responseTotals *prom.CounterVec
requestDurationMs *prom.HistogramVec
responseLatencyMs *prom.HistogramVec
}
var ( var (
labels = generatePromLabels()
grpcResponseCodes = []codes.Code{ grpcResponseCodes = []codes.Code{
codes.OK, codes.OK,
codes.PermissionDenied, codes.PermissionDenied,
@ -93,23 +110,20 @@ var (
http.StatusNetworkAuthenticationRequired, http.StatusNetworkAuthenticationRequired,
} }
ports = []uint32{3333, 6262}
// latencyBucketBounds holds the maximum value (inclusive, in tenths of a // latencyBucketBounds holds the maximum value (inclusive, in tenths of a
// millisecond) that may be counted in a given histogram bucket. // millisecond) that may be counted in a given histogram bucket.
// These values are one order of magnitude greater than the controller's // These values are one order of magnitude greater than the controller's
// Prometheus buckets, because the proxy will reports latencies in tenths // Prometheus buckets, because the proxy will reports latencies in tenths
// of a millisecond rather than whole milliseconds. // of a millisecond rather than whole milliseconds.
latencyBucketBounds = [26]uint32{ latencyBucketBounds = []float64{
// prometheus.LinearBuckets(1, 1, 5), // prometheus.LinearBuckets(1, 1, 5),
10, 20, 30, 40, 50, 10, 20, 30, 40, 50,
// prometheus.LinearBuckets(10, 10, 5), // prometheus.LinearBuckets(10, 10, 5),
100, 200, 300, 400, 50, 100, 200, 300, 400, 500,
// prometheus.LinearBuckets(100, 100, 5), // prometheus.LinearBuckets(100, 100, 5),
1000, 2000, 3000, 4000, 5000, 1000, 2000, 3000, 4000, 5000,
// prometheus.LinearBuckets(1000, 1000, 5), // prometheus.LinearBuckets(1000, 1000, 5),
10000, 20000, 30000, 40000, 5000, 10000, 20000, 30000, 40000, 50000,
// prometheus.LinearBuckets(10000, 10000, 5), // prometheus.LinearBuckets(10000, 10000, 5),
100000, 200000, 300000, 400000, 500000, 100000, 200000, 300000, 400000, 500000,
// Prometheus implicitly creates a max bucket for everything that // Prometheus implicitly creates a max bucket for everything that
@ -119,8 +133,122 @@ var (
} }
) )
func randomPort() uint32 { // generateProxyTraffic randomly creates metrics under the guise of a single conduit proxy routing traffic.
return ports[rand.Intn(len(ports))] // metrics are generated for each proxyMetricCollector.
func (s *simulatedProxy) generateProxyTraffic() {
for {
inboundRandomCount := int(rand.Float64() * 10)
outboundRandomCount := int(rand.Float64() * 10)
deployment := getRandomDeployment(s.deployments, map[string]struct{}{s.podOwner: {}})
//split the deployment name into ["namespace", "deployment"]
destinationDeploymentName := strings.Split(deployment, "/")[1]
s.requestTotals.
With(overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, false))).
Add(float64(inboundRandomCount))
s.responseTotals.
With(overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, true))).
Add(float64(outboundRandomCount))
observeHistogramVec(
randomLatencies(randomCount()),
s.responseLatencyMs,
overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, true)))
observeHistogramVec(
randomLatencies(randomCount()),
s.requestDurationMs,
overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, false)))
time.Sleep(s.sleep)
}
}
// newConduitLabel creates a label map to be used for metric generation.
func (s *simulatedProxy) newConduitLabel(destinationPod string, isResponseLabel bool) prom.Labels {
labelMap := prom.Labels{
"direction": randomRequestDirection(),
"deployment": s.deploymentName,
"authority": "world.greeting:7778",
"namespace": s.namespace,
}
if labelMap["direction"] == "outbound" {
labelMap["dst_deployment"] = destinationPod
}
if isResponseLabel {
if rand.Intn(2) == 0 {
labelMap["grpc_status_code"] = fmt.Sprintf("%d", randomGrpcResponseCode())
} else {
labelMap["status_code"] = fmt.Sprintf("%d", randomHttpResponseCode())
}
}
return labelMap
}
// observeHistogramVec uses a latencyBuckets slice which holds an array of numbers that indicate
// how many observations will be added to a each bucket. latencyBuckets and latencyBucketBounds
// both are of the same array length. ObserveHistogramVec selects a latencyBucketBound based on a position
// in the latencyBucket and then makes an observation within the selected bucket.
func observeHistogramVec(latencyBuckets []uint32, latencies *prom.HistogramVec, latencyLabels prom.Labels) {
for bucketNum, count := range latencyBuckets {
latencyMs := float64(latencyBucketBounds[bucketNum]) / 10
for i := uint32(0); i < count; i++ {
latencies.With(latencyLabels).Observe(latencyMs)
}
}
}
func randomRequestDirection() string {
if rand.Intn(2) == 0 {
return "inbound"
}
return "outbound"
}
func generatePromLabels() []string {
kubeResourceTypes := []string{
"job",
"replica_set",
"deployment",
"daemon_set",
"replication_controller",
"namespace",
}
constantLabels := []string{
"direction",
"authority",
"status_code",
"grpc_status_code",
}
destinationLabels := make([]string, len(kubeResourceTypes))
for i, label := range kubeResourceTypes {
destinationLabels[i] = fmt.Sprintf("dst_%s", label)
}
return append(append(constantLabels, kubeResourceTypes...), destinationLabels...)
}
// overrideDefaultLabels combines two maps of the same size with the keys
// map1 values take precedence during the union
func overrideDefaultLabels(map1 map[string]string) map[string]string {
map2 := generateLabelMap(labels)
for k := range map2 {
map2[k] = map1[k]
}
return map2
}
func generateLabelMap(labels []string) map[string]string {
labelMap := make(map[string]string, len(labels))
for _, label := range labels {
labelMap[label] = ""
}
return labelMap
} }
func randomCount() uint32 { func randomCount() uint32 {
@ -138,30 +266,6 @@ func randomLatencies(count uint32) []uint32 {
return latencies return latencies
} }
func randomGrpcEos(count uint32) (eos []*pb.EosScope) {
grpcResponseCodes := make(map[uint32]uint32)
for i := uint32(0); i < count; i++ {
grpcResponseCodes[randomGrpcResponseCode()] += 1
}
for code, streamCount := range grpcResponseCodes {
eos = append(eos, &pb.EosScope{
Ctx: &pb.EosCtx{End: &pb.EosCtx_GrpcStatusCode{GrpcStatusCode: code}},
Streams: streamCount,
})
}
return
}
func randomH2Eos(count uint32) (eos []*pb.EosScope) {
for i := uint32(0); i < count; i++ {
eos = append(eos, &pb.EosScope{
Ctx: &pb.EosCtx{End: &pb.EosCtx_Other{Other: true}},
Streams: uint32(rand.Int31()),
})
}
return
}
func randomGrpcResponseCode() uint32 { func randomGrpcResponseCode() uint32 {
return uint32(grpcResponseCodes[rand.Intn(len(grpcResponseCodes))]) return uint32(grpcResponseCodes[rand.Intn(len(grpcResponseCodes))])
} }
@ -170,44 +274,94 @@ func randomHttpResponseCode() uint32 {
return uint32(httpResponseCodes[rand.Intn(len(httpResponseCodes))]) return uint32(httpResponseCodes[rand.Intn(len(httpResponseCodes))])
} }
func stringToIp(str string) *common.IPAddress {
octets := make([]uint8, 0)
for _, num := range strings.Split(str, ".") {
oct, _ := strconv.Atoi(num)
octets = append(octets, uint8(oct))
}
return util.IPV4(octets[0], octets[1], octets[2], octets[3])
}
func podIndexFunc(obj interface{}) ([]string, error) { func podIndexFunc(obj interface{}) ([]string, error) {
return nil, nil return nil, nil
} }
func randomPod(pods []*v1.Pod, prvPodIp *common.IPAddress) *common.IPAddress { func getRandomDeployment(deployments []string, excludeDeployments map[string]struct{}) string {
var podIp *common.IPAddress filteredDeployments := make([]string, 0)
for {
if podIp != nil {
break
}
randomPod := pods[rand.Intn(len(pods))] for _, deployment := range deployments {
if strings.HasPrefix(randomPod.GetNamespace(), "kube-") { if _, ok := excludeDeployments[deployment]; !ok {
continue // skip pods in the kube-* namespaces filteredDeployments = append(filteredDeployments, deployment)
}
podIp = stringToIp(randomPod.Status.PodIP)
if prvPodIp != nil && podIp.GetIpv4() == prvPodIp.GetIpv4() {
podIp = nil
} }
} }
return podIp return filteredDeployments[rand.Intn(len(filteredDeployments))]
}
func newSimulatedProxy(podOwner string, deployments []string, sleep *time.Duration) *simulatedProxy {
podOwnerComponents := strings.Split(podOwner, "/")
name := podOwnerComponents[1]
namespace := podOwnerComponents[0]
proxy := simulatedProxy{
podOwner: podOwner,
sleep: *sleep,
deployments: deployments,
namespace: namespace,
deploymentName: name,
registerer: prom.NewRegistry(),
proxyMetricCollectors: &proxyMetricCollectors{
requestTotals: prom.NewCounterVec(
prom.CounterOpts{
Name: "request_total",
Help: "A counter of the number of requests the proxy has received",
}, labels),
responseTotals: prom.NewCounterVec(
prom.CounterOpts{
Name: "response_total",
Help: "A counter of the number of responses the proxy has received.",
}, labels),
requestDurationMs: prom.NewHistogramVec(
prom.HistogramOpts{
Name: "request_duration_ms",
Help: "A histogram of the duration of a response",
Buckets: latencyBucketBounds,
}, labels),
responseLatencyMs: prom.NewHistogramVec(
prom.HistogramOpts{
Name: "response_latency_ms",
Help: "A histogram of the total latency of a response.",
Buckets: latencyBucketBounds,
}, labels),
},
}
proxy.registerer.MustRegister(
proxy.requestTotals,
proxy.responseTotals,
proxy.requestDurationMs,
proxy.responseLatencyMs,
)
return &proxy
}
func getDeployments(podList []*v1.Pod, deploys *k8s.ReplicaSetStore, maxPods *int) []string {
allPods := make([]*v1.Pod, 0)
deploymentSet := make(map[string]struct{})
for _, pod := range podList {
if pod.Status.PodIP != "" && !strings.HasPrefix(pod.GetNamespace(), "kube-") && (*maxPods == 0 || len(allPods) < *maxPods) {
allPods = append(allPods, pod)
deploymentName, err := deploys.GetDeploymentForPod(pod)
if err != nil {
log.Fatal(err.Error())
}
deploymentSet[deploymentName] = struct{}{}
}
}
deployments := make([]string, 0)
for deployment := range deploymentSet {
deployments = append(deployments, deployment)
}
return deployments
} }
func main() { func main() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
addr := flag.String("addr", ":8086", "address of proxy api")
requestCount := flag.Int("requests", 0, "number of api requests to make (default: infinite)")
sleep := flag.Duration("sleep", time.Second, "time to sleep between requests") sleep := flag.Duration("sleep", time.Second, "time to sleep between requests")
metricsAddrs := flag.String("metric-addrs", ":9000,:9001,:9002", "range of network addresses to serve prometheus metrics")
maxPods := flag.Int("max-pods", 0, "total number of pods to simulate (default unlimited)") maxPods := flag.Int("max-pods", 0, "total number of pods to simulate (default unlimited)")
kubeConfigPath := flag.String("kubeconfig", "", "path to kube config - required") kubeConfigPath := flag.String("kubeconfig", "", "path to kube config - required")
flag.Parse() flag.Parse()
@ -217,12 +371,6 @@ func main() {
return return
} }
client, conn, err := proxy.NewTelemetryClient(*addr)
if err != nil {
log.Fatal(err.Error())
}
defer conn.Close()
clientSet, err := k8s.NewClientSet(*kubeConfigPath) clientSet, err := k8s.NewClientSet(*kubeConfigPath)
if err != nil { if err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
@ -233,120 +381,49 @@ func main() {
log.Fatal(err.Error()) log.Fatal(err.Error())
} }
deploys, err := k8s.NewReplicaSetStore(clientSet)
if err != nil {
log.Fatal(err.Error())
}
err = pods.Run() err = pods.Run()
if err != nil { if err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
} }
err = deploys.Run()
if err != nil {
log.Fatal(err.Error())
}
podList, err := pods.List() podList, err := pods.List()
if err != nil { if err != nil {
log.Fatal(err.Error()) log.Fatal(err.Error())
} }
allPods := make([]*v1.Pod, 0) deployments := getDeployments(podList, deploys, maxPods)
for _, pod := range podList {
if pod.Status.PodIP != "" && (*maxPods == 0 || len(allPods) < *maxPods) { stopCh := make(chan os.Signal)
allPods = append(allPods, pod) signal.Notify(stopCh, os.Interrupt, os.Kill)
}
} excludedDeployments := map[string]struct{}{}
for i := 0; (*requestCount == 0) || (i < *requestCount); i++ { for _, addr := range strings.Split(*metricsAddrs, ",") {
count := randomCount() randomPodOwner := getRandomDeployment(deployments, excludedDeployments)
sourceIp := randomPod(allPods, nil) excludedDeployments[randomPodOwner] = struct{}{}
targetIp := randomPod(allPods, sourceIp)
go func(address string, podOwner string, deployments []string) {
req := &pb.ReportRequest{
Process: &pb.Process{ proxy := newSimulatedProxy(podOwner, deployments, sleep)
ScheduledInstance: "hello-1mfa0", server := &http.Server{
ScheduledNamespace: "people", Addr: address,
}, Handler: promhttp.HandlerFor(proxy.registerer, promhttp.HandlerOpts{}),
ClientTransports: []*pb.ClientTransport{ }
// TCP log.Infof("serving scrapable metrics on %s", address)
&pb.ClientTransport{ go server.ListenAndServe()
TargetAddr: &common.TcpAddress{ go proxy.generateProxyTraffic()
Ip: targetIp,
Port: randomPort(), }(addr, randomPodOwner, deployments)
},
Connects: count,
Disconnects: []*pb.TransportSummary{
&pb.TransportSummary{
DurationMs: uint64(randomCount()),
BytesSent: uint64(randomCount()),
},
},
Protocol: common.Protocol_TCP,
},
},
ServerTransports: []*pb.ServerTransport{
// TCP
&pb.ServerTransport{
SourceIp: sourceIp,
Connects: count,
Disconnects: []*pb.TransportSummary{
&pb.TransportSummary{
DurationMs: uint64(randomCount()),
BytesSent: uint64(randomCount()),
},
},
Protocol: common.Protocol_TCP,
},
},
Proxy: pb.ReportRequest_INBOUND,
Requests: []*pb.RequestScope{
// gRPC
&pb.RequestScope{
Ctx: &pb.RequestCtx{
SourceIp: sourceIp,
TargetAddr: &common.TcpAddress{
Ip: targetIp,
Port: randomPort(),
},
Authority: "world.greeting:7778",
},
Count: count,
Responses: []*pb.ResponseScope{
&pb.ResponseScope{
Ctx: &pb.ResponseCtx{
HttpStatusCode: http.StatusOK,
},
ResponseLatencyCounts: randomLatencies(count),
Ends: randomGrpcEos(count),
},
},
},
// HTTP/2
&pb.RequestScope{
Ctx: &pb.RequestCtx{
SourceIp: sourceIp,
TargetAddr: &common.TcpAddress{
Ip: targetIp,
Port: randomPort(),
},
Authority: "world.greeting:7778",
},
Count: count,
Responses: []*pb.ResponseScope{
&pb.ResponseScope{
Ctx: &pb.ResponseCtx{
HttpStatusCode: randomHttpResponseCode(),
},
ResponseLatencyCounts: randomLatencies(count),
Ends: randomH2Eos(count),
},
},
},
},
HistogramBucketBoundsTenthMs: latencyBucketBounds[:],
}
_, err = client.Report(context.Background(), req)
if err != nil {
log.Fatal(err.Error())
}
time.Sleep(*sleep)
} }
<-stopCh
} }

View File

@ -110,3 +110,19 @@ services:
command: command:
- --config.file=/etc/prometheus/prometheus.yml - --config.file=/etc/prometheus/prometheus.yml
- --storage.tsdb.retention=6h - --storage.tsdb.retention=6h
simulate-proxy:
image: golang:1.10.0-alpine3.7
ports:
- 9000-9002:9000-9002
volumes:
- .:/go/src/github.com/runconduit/conduit
- ~/.kube/config:/kubeconfig:ro
command:
- go
- run
- /go/src/github.com/runconduit/conduit/controller/script/simulate-proxy/main.go
- --metric-addrs=0.0.0.0:9000,0.0.0.0:9001,0.0.0.0:9002
- --kubeconfig=/kubeconfig
- --max-pods=10
- --sleep=3s