From 97546e0646f37f5a42fd54b852c723cecc7f2e9c Mon Sep 17 00:00:00 2001 From: Andrew Seigner Date: Fri, 30 Mar 2018 13:28:45 -0700 Subject: [PATCH] Modify simulate-proxy to be more pod-centric (#653) simulate-proxy uses a deployment object from kubernetes to simulate each proxy metrics endpoint. Modify simulate-proxy to instead use a pod to simulate each proxy metrics endpoint. This ensures that each metrics endpoint consistently represents a pod in kubernetes, including it's namespace, deployment, and label information. This change also adds support for: - a new `metric-ports` flag, default is `10000-10009`. - `classification`, `pod_name`, and `pod_template_hash` labels Signed-off-by: Andrew Seigner --- .prometheus.dev.yml | 21 +- controller/k8s/replicasets.go | 3 + controller/script/simulate-proxy/main.go | 418 +++++++++++++---------- docker-compose.yml | 5 +- 4 files changed, 253 insertions(+), 194 deletions(-) diff --git a/.prometheus.dev.yml b/.prometheus.dev.yml index a1042ca47..5d2f6defd 100644 --- a/.prometheus.dev.yml +++ b/.prometheus.dev.yml @@ -25,14 +25,15 @@ scrape_configs: - job_name: 'conduit-proxy' static_configs: + # use this to generate: https://play.golang.org/p/e7uvgT5sUo9 - targets: - - 'simulate-proxy:9000' - - 'simulate-proxy:9001' - - 'simulate-proxy:9002' - - 'simulate-proxy:9003' - - 'simulate-proxy:9004' - - 'simulate-proxy:9005' - - 'simulate-proxy:9006' - - 'simulate-proxy:9007' - - 'simulate-proxy:9008' - - 'simulate-proxy:9009' + - 'simulate-proxy:10000' + - 'simulate-proxy:10001' + - 'simulate-proxy:10002' + - 'simulate-proxy:10003' + - 'simulate-proxy:10004' + - 'simulate-proxy:10005' + - 'simulate-proxy:10006' + - 'simulate-proxy:10007' + - 'simulate-proxy:10008' + - 'simulate-proxy:10009' diff --git a/controller/k8s/replicasets.go b/controller/k8s/replicasets.go index c6e45aeb0..fae86efd9 100644 --- a/controller/k8s/replicasets.go +++ b/controller/k8s/replicasets.go @@ -69,6 +69,9 @@ func (p *ReplicaSetStore) GetReplicaSet(key string) (*v1beta1.ReplicaSet, error) return rs, nil } +// GetDeploymentForPod returns pod owner information: +// if the pod's replicaset belongs to a deployment: "namespace/deployment" +// if the pod's replicaset does not belong to a deployment: "namespace/replicaset" func (p *ReplicaSetStore) GetDeploymentForPod(pod *v1.Pod) (string, error) { namespace := pod.Namespace if len(pod.GetOwnerReferences()) == 0 { diff --git a/controller/script/simulate-proxy/main.go b/controller/script/simulate-proxy/main.go index 8c8375536..7d7bcec44 100644 --- a/controller/script/simulate-proxy/main.go +++ b/controller/script/simulate-proxy/main.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "os/signal" + "strconv" "strings" "time" @@ -24,20 +25,19 @@ import ( /* A simple script for exposing simulated prometheus metrics */ type simulatedProxy struct { - podOwner string - deploymentName string - sleep time.Duration - namespace string - deployments []string - registerer *prom.Registry - *proxyMetricCollectors + sleep time.Duration + deployments []string + registerer *prom.Registry + inbound *proxyMetricCollectors + outbound *proxyMetricCollectors } type proxyMetricCollectors struct { - requestTotals *prom.CounterVec - responseTotals *prom.CounterVec - requestDurationMs *prom.HistogramVec - responseLatencyMs *prom.HistogramVec + requestTotals *prom.CounterVec + responseTotals *prom.CounterVec + requestDurationMs *prom.HistogramVec + responseLatencyMs *prom.HistogramVec + responseDurationMs *prom.HistogramVec } const ( @@ -45,33 +45,6 @@ const ( ) var ( - // for reference: https://github.com/runconduit/conduit/blob/master/doc/proxy-metrics.md#labels - labels = []string{ - // kubeResourceTypes - "daemon_set", - "deployment", - "k8s_job", - "replication_controller", - "replica_set", - - "pod_template_hash", - "namespace", - - // constantLabels - "direction", - "authority", - "status_code", - "grpc_status_code", - - // destinationLabels - "dst_daemon_set", - "dst_deployment", - "dst_job", - "dst_replication_controller", - "dst_replica_set", - "dst_namespace", - } - grpcResponseCodes = []codes.Code{ codes.OK, codes.PermissionDenied, @@ -168,110 +141,79 @@ var ( func (s *simulatedProxy) generateProxyTraffic() { for { - inboundRandomCount := int(rand.Float64() * 10) - outboundRandomCount := int(rand.Float64() * 10) + for _, deployment := range s.deployments { - //split the deployment name into ["namespace", "deployment"] - destinationDeploymentName := strings.Split(deployment, "/")[1] - s.requestTotals. - With(overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, false))). - Add(float64(inboundRandomCount)) + // + // inbound + // + inboundRandomCount := int(rand.Float64() * 10) - s.responseTotals. - With(overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, true))). - Add(float64(outboundRandomCount)) + // inbound requests + s.inbound.requestTotals.With(prom.Labels{}).Add(float64(inboundRandomCount)) + for _, latency := range randomLatencies(inboundRandomCount) { + s.inbound.requestDurationMs.With(prom.Labels{}).Observe(latency) + } - observeHistogramVec( - randomLatencies(randomCount()), - s.responseLatencyMs, - overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, true))) + // inbound responses + inboundResponseLabels := randomResponseLabels() + s.inbound.responseTotals.With(inboundResponseLabels).Add(float64(inboundRandomCount)) + for _, latency := range randomLatencies(inboundRandomCount) { + s.inbound.responseLatencyMs.With(inboundResponseLabels).Observe(latency) + } + for _, latency := range randomLatencies(inboundRandomCount) { + s.inbound.responseDurationMs.With(inboundResponseLabels).Observe(latency) + } - observeHistogramVec( - randomLatencies(randomCount()), - s.requestDurationMs, - overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, false))) + // + // outbound + // + outboundRandomCount := int(rand.Float64() * 10) + + // split the deployment name into ["namespace", "deployment"] + dstList := strings.Split(deployment, "/") + outboundLabels := prom.Labels{"dst_namespace": dstList[0], "dst_deployment": dstList[1]} + + // outbound requests + s.outbound.requestTotals.With(outboundLabels).Add(float64(outboundRandomCount)) + for _, latency := range randomLatencies(outboundRandomCount) { + s.outbound.requestDurationMs.With(outboundLabels).Observe(latency) + } + + // outbound resposnes + outboundResponseLabels := outboundLabels + for k, v := range randomResponseLabels() { + outboundResponseLabels[k] = v + } + s.outbound.responseTotals.With(outboundResponseLabels).Add(float64(outboundRandomCount)) + for _, latency := range randomLatencies(outboundRandomCount) { + s.outbound.responseLatencyMs.With(outboundResponseLabels).Observe(latency) + } + for _, latency := range randomLatencies(outboundRandomCount) { + s.outbound.responseDurationMs.With(outboundResponseLabels).Observe(latency) + } } time.Sleep(s.sleep) } } -// newConduitLabel creates a label map to be used for metric generation. -func (s *simulatedProxy) newConduitLabel(destinationPod string, isResponseLabel bool) prom.Labels { - labelMap := prom.Labels{ - "direction": randomRequestDirection(), - "deployment": s.deploymentName, - "authority": "world.greeting:7778", - "namespace": s.namespace, - } - if labelMap["direction"] == "outbound" { - labelMap["dst_deployment"] = destinationPod - } - if isResponseLabel { - if rand.Intn(2) == 0 { - labelMap["grpc_status_code"] = fmt.Sprintf("%d", randomGrpcResponseCode()) - } else { - labelMap["status_code"] = fmt.Sprintf("%d", randomHttpResponseCode()) - } +func randomResponseLabels() prom.Labels { + labelMap := prom.Labels{"classification": "success"} + + grpcCode := randomGrpcResponseCode() + labelMap["grpc_status_code"] = fmt.Sprintf("%d", grpcCode) + + httpCode := randomHttpResponseCode() + labelMap["status_code"] = fmt.Sprintf("%d", httpCode) + + if grpcCode != uint32(codes.OK) || httpCode != http.StatusOK { + labelMap["classification"] = "fail" } return labelMap } -// observeHistogramVec uses a latencyBuckets slice which holds an array of numbers that indicate -// how many observations will be added to a each bucket. latencyBuckets and latencyBucketBounds -// both are of the same array length. ObserveHistogramVec selects a latencyBucketBound based on a position -// in the latencyBucket and then makes an observation within the selected bucket. -func observeHistogramVec(latencyBuckets []uint32, latencies *prom.HistogramVec, latencyLabels prom.Labels) { - for bucketNum, count := range latencyBuckets { - latencyMs := float64(latencyBucketBounds[bucketNum]) / 10 - for i := uint32(0); i < count; i++ { - latencies.With(latencyLabels).Observe(latencyMs) - } - } -} - -func randomRequestDirection() string { - if rand.Intn(2) == 0 { - return "inbound" - } - return "outbound" -} - -// overrideDefaultLabels combines two maps of the same size with the keys -// map1 values take precedence during the union -func overrideDefaultLabels(map1 map[string]string) map[string]string { - map2 := generateLabelMap(labels) - for k := range map2 { - map2[k] = map1[k] - } - return map2 -} - -func generateLabelMap(labels []string) map[string]string { - labelMap := make(map[string]string, len(labels)) - for _, label := range labels { - labelMap[label] = "" - } - return labelMap -} - -func randomCount() uint32 { - return uint32(rand.Int31n(100) + 1) -} - -func randomLatencies(count uint32) []uint32 { - latencies := make([]uint32, len(latencyBucketBounds)) - for i := uint32(0); i < count; i++ { - - // Randomly select a bucket to increment. - bucket := uint32(rand.Int31n(int32(len(latencies)))) - latencies[bucket]++ - } - return latencies -} - func randomGrpcResponseCode() uint32 { code := codes.OK if rand.Float32() > successRate { @@ -288,79 +230,173 @@ func randomHttpResponseCode() uint32 { return uint32(code) } +func randomLatencies(count int) []float64 { + latencies := make([]float64, count) + for i := 0; i < count; i++ { + // Select a latency from a bucket. + latencies[i] = latencyBucketBounds[rand.Int31n(int32(len(latencyBucketBounds)))] + } + return latencies +} + func podIndexFunc(obj interface{}) ([]string, error) { return nil, nil } -func filterDeployments(deployments []string, excludeDeployments map[string]struct{}) []string { +func filterDeployments(deployments []string, excludeDeployments map[string]struct{}, max int) []string { filteredDeployments := []string{} for _, deployment := range deployments { if _, ok := excludeDeployments[deployment]; !ok { filteredDeployments = append(filteredDeployments, deployment) + if len(filteredDeployments) == max { + break + } } } return filteredDeployments } -func newSimulatedProxy(podOwner string, deployments []string, sleep *time.Duration) *simulatedProxy { - podOwnerComponents := strings.Split(podOwner, "/") - name := podOwnerComponents[1] - namespace := podOwnerComponents[0] +func newSimulatedProxy(pod v1.Pod, deployments []string, replicaSets *k8s.ReplicaSetStore, sleep *time.Duration, maxDst int) *simulatedProxy { + ownerInfo, err := replicaSets.GetDeploymentForPod(&pod) + if err != nil { + log.Fatal(err.Error()) + } + // GetDeploymentForPod returns "namespace/deployment" + deploymentName := strings.Split(ownerInfo, "/")[1] + dstDeployments := filterDeployments(deployments, map[string]struct{}{deploymentName: {}}, maxDst) + + constLabels := prom.Labels{ + "authority": "fakeauthority:123", + "namespace": pod.GetNamespace(), + "deployment": deploymentName, + "pod_template_hash": pod.GetLabels()["pod-template-hash"], + "pod_name": pod.GetName(), + + // TODO: support other k8s objects + // "daemon_set", + // "k8s_job", + // "replication_controller", + // "replica_set", + } + + requestLabels := []string{ + "direction", + + // outbound only + "dst_namespace", + "dst_deployment", + + // TODO: support other k8s dst objects + // "dst_daemon_set", + // "dst_job", + // "dst_replication_controller", + // "dst_replica_set", + } + + responseLabels := append( + requestLabels, + []string{ + "classification", + "grpc_status_code", + "status_code", + }..., + ) + + proxyMetrics := proxyMetricCollectors{ + requestTotals: prom.NewCounterVec( + prom.CounterOpts{ + Name: "request_total", + Help: "A counter of the number of requests the proxy has received", + ConstLabels: constLabels, + }, requestLabels), + responseTotals: prom.NewCounterVec( + prom.CounterOpts{ + Name: "response_total", + Help: "A counter of the number of responses the proxy has received", + ConstLabels: constLabels, + }, responseLabels), + requestDurationMs: prom.NewHistogramVec( + prom.HistogramOpts{ + Name: "request_duration_ms", + Help: "A histogram of the duration of a request", + ConstLabels: constLabels, + Buckets: latencyBucketBounds, + }, requestLabels), + responseLatencyMs: prom.NewHistogramVec( + prom.HistogramOpts{ + Name: "response_latency_ms", + Help: "A histogram of the total latency of a response", + ConstLabels: constLabels, + Buckets: latencyBucketBounds, + }, responseLabels), + responseDurationMs: prom.NewHistogramVec( + prom.HistogramOpts{ + Name: "response_duration_ms", + Help: "A histogram of the duration of a response", + ConstLabels: constLabels, + Buckets: latencyBucketBounds, + }, responseLabels), + } + + inboundLabels := prom.Labels{ + "direction": "inbound", + + // dst_* labels are not valid for inbound, but all labels must always be set + // in every increment call, so we set these to empty for all inbound metrics. + "dst_namespace": "", + "dst_deployment": "", + } + + outboundLables := prom.Labels{ + "direction": "outbound", + } proxy := simulatedProxy{ - podOwner: podOwner, - sleep: *sleep, - deployments: deployments, - namespace: namespace, - deploymentName: name, - registerer: prom.NewRegistry(), - proxyMetricCollectors: &proxyMetricCollectors{ - requestTotals: prom.NewCounterVec( - prom.CounterOpts{ - Name: "request_total", - Help: "A counter of the number of requests the proxy has received", - }, labels), - responseTotals: prom.NewCounterVec( - prom.CounterOpts{ - Name: "response_total", - Help: "A counter of the number of responses the proxy has received.", - }, labels), - requestDurationMs: prom.NewHistogramVec( - prom.HistogramOpts{ - Name: "request_duration_ms", - Help: "A histogram of the duration of a response", - Buckets: latencyBucketBounds, - }, labels), - responseLatencyMs: prom.NewHistogramVec( - prom.HistogramOpts{ - Name: "response_latency_ms", - Help: "A histogram of the total latency of a response.", - Buckets: latencyBucketBounds, - }, labels), + sleep: *sleep, + deployments: dstDeployments, + registerer: prom.NewRegistry(), + inbound: &proxyMetricCollectors{ + requestTotals: proxyMetrics.requestTotals.MustCurryWith(inboundLabels), + responseTotals: proxyMetrics.responseTotals.MustCurryWith(inboundLabels), + requestDurationMs: proxyMetrics.requestDurationMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), + responseLatencyMs: proxyMetrics.responseLatencyMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), + responseDurationMs: proxyMetrics.responseDurationMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), + }, + outbound: &proxyMetricCollectors{ + requestTotals: proxyMetrics.requestTotals.MustCurryWith(outboundLables), + responseTotals: proxyMetrics.responseTotals.MustCurryWith(outboundLables), + requestDurationMs: proxyMetrics.requestDurationMs.MustCurryWith(outboundLables).(*prom.HistogramVec), + responseLatencyMs: proxyMetrics.responseLatencyMs.MustCurryWith(outboundLables).(*prom.HistogramVec), + responseDurationMs: proxyMetrics.responseDurationMs.MustCurryWith(outboundLables).(*prom.HistogramVec), }, } proxy.registerer.MustRegister( - proxy.requestTotals, - proxy.responseTotals, - proxy.requestDurationMs, - proxy.responseLatencyMs, + proxyMetrics.requestTotals, + proxyMetrics.responseTotals, + proxyMetrics.requestDurationMs, + proxyMetrics.responseLatencyMs, + proxyMetrics.responseDurationMs, ) return &proxy } -func getDeployments(podList []*v1.Pod, deploys *k8s.ReplicaSetStore, maxPods *int) []string { +func getK8sObjects(podList []*v1.Pod, replicaSets *k8s.ReplicaSetStore, maxPods int) ([]*v1.Pod, []string) { allPods := make([]*v1.Pod, 0) deploymentSet := make(map[string]struct{}) for _, pod := range podList { - if pod.Status.PodIP != "" && !strings.HasPrefix(pod.GetNamespace(), "kube-") && (*maxPods == 0 || len(allPods) < *maxPods) { + if pod.Status.PodIP != "" && !strings.HasPrefix(pod.GetNamespace(), "kube-") { allPods = append(allPods, pod) - deploymentName, err := deploys.GetDeploymentForPod(pod) + deploymentName, err := replicaSets.GetDeploymentForPod(pod) if err != nil { log.Fatal(err.Error()) } deploymentSet[deploymentName] = struct{}{} + + if maxPods != 0 && len(allPods) == maxPods { + break + } } } @@ -368,14 +404,13 @@ func getDeployments(podList []*v1.Pod, deploys *k8s.ReplicaSetStore, maxPods *in for deployment := range deploymentSet { deployments = append(deployments, deployment) } - return deployments + return allPods, deployments } func main() { rand.Seed(time.Now().UnixNano()) sleep := flag.Duration("sleep", time.Second, "time to sleep between requests") - metricsAddrs := flag.String("metric-addrs", ":9000,:9001,:9002", "range of network addresses to serve prometheus metrics") - maxPods := flag.Int("max-pods", 0, "total number of pods to simulate (default unlimited)") + metricsPorts := flag.String("metric-ports", "10000-10002", "range (inclusive) of network ports to serve prometheus metrics") kubeConfigPath := flag.String("kubeconfig", "", "path to kube config - required") flag.Parse() @@ -384,6 +419,19 @@ func main() { return } + ports := strings.Split(*metricsPorts, "-") + if len(ports) != 2 { + log.Fatalf("Invalid metric-ports flag, must be of the form '[start]-[end]': %s", *metricsPorts) + } + startPort, err := strconv.Atoi(ports[0]) + if err != nil { + log.Fatalf("Invalid start port, must be an integer: %s", ports[0]) + } + endPort, err := strconv.Atoi(ports[1]) + if err != nil { + log.Fatalf("Invalid end port, must be an integer: %s", ports[1]) + } + clientSet, err := k8s.NewClientSet(*kubeConfigPath) if err != nil { log.Fatal(err.Error()) @@ -394,7 +442,7 @@ func main() { log.Fatal(err.Error()) } - deploys, err := k8s.NewReplicaSetStore(clientSet) + replicaSets, err := k8s.NewReplicaSetStore(clientSet) if err != nil { log.Fatal(err.Error()) } @@ -404,7 +452,7 @@ func main() { log.Fatal(err.Error()) } - err = deploys.Run() + err = replicaSets.Run() if err != nil { log.Fatal(err.Error()) } @@ -414,21 +462,29 @@ func main() { log.Fatal(err.Error()) } - deployments := getDeployments(podList, deploys, maxPods) + proxyCount := endPort - startPort + 1 + simulatedPods, deployments := getK8sObjects(podList, replicaSets, proxyCount) + podsFound := len(simulatedPods) + if podsFound < proxyCount { + log.Warnf("Found only %d pods to simulate %d proxies, creating %d fake pods.", podsFound, proxyCount, proxyCount-podsFound) + for i := 0; i < proxyCount-podsFound; i++ { + pod := simulatedPods[i%podsFound].DeepCopy() + name := fmt.Sprintf("%s-fake-%d", pod.GetName(), i) + pod.SetName(name) + simulatedPods = append(simulatedPods, pod) + } + } stopCh := make(chan os.Signal) signal.Notify(stopCh, os.Interrupt, os.Kill) - ownedDeployments := map[string]struct{}{} + // simulate network topology of N * sqrt(N) request paths + maxDst := int(math.Sqrt(float64(len(deployments)))) + 1 - for _, addr := range strings.Split(*metricsAddrs, ",") { - unowned := filterDeployments(deployments, ownedDeployments) - randomPodOwner := unowned[rand.Intn(len(unowned))] - ownedDeployments[randomPodOwner] = struct{}{} + for port := startPort; port <= endPort; port++ { + proxy := newSimulatedProxy(*simulatedPods[port-startPort], deployments, replicaSets, sleep, maxDst) - dstDeployments := filterDeployments(deployments, map[string]struct{}{randomPodOwner: {}}) - - proxy := newSimulatedProxy(randomPodOwner, dstDeployments, sleep) + addr := fmt.Sprintf("0.0.0.0:%d", port) server := &http.Server{ Addr: addr, Handler: promhttp.HandlerFor(proxy.registerer, promhttp.HandlerOpts{}), diff --git a/docker-compose.yml b/docker-compose.yml index a862590c2..8f7a4d097 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -123,7 +123,7 @@ services: simulate-proxy: image: golang:1.10.0-alpine3.7 ports: - - 9000-9009:9000-9009 + - 10000-10009:10000-10009 volumes: - .:/go/src/github.com/runconduit/conduit - ~/.kube/config:/kubeconfig:ro @@ -131,7 +131,6 @@ services: - go - run - /go/src/github.com/runconduit/conduit/controller/script/simulate-proxy/main.go - - --metric-addrs=0.0.0.0:9000,0.0.0.0:9001,0.0.0.0:9002,0.0.0.0:9003,0.0.0.0:9004,0.0.0.0:9005,0.0.0.0:9006,0.0.0.0:9007,0.0.0.0:9008,0.0.0.0:9009 + - --metric-ports=10000-10009 - --kubeconfig=/kubeconfig - - --max-pods=10 - --sleep=3s