diff --git a/.prometheus.dev.yml b/.prometheus.dev.yml index d1af99c90..a336de66f 100644 --- a/.prometheus.dev.yml +++ b/.prometheus.dev.yml @@ -31,3 +31,10 @@ scrape_configs: - job_name: 'destination' static_configs: - targets: ['destination:9999'] + + - job_name: 'conduit' + static_configs: + - targets: + - 'simulate-proxy:9000' + - 'simulate-proxy:9001' + - 'simulate-proxy:9002' diff --git a/BUILD.md b/BUILD.md index 97566850e..c6ac7ac90 100644 --- a/BUILD.md +++ b/BUILD.md @@ -167,11 +167,11 @@ traffic to the docker-compose environment: ```bash # confirm you are connected to Kubernetes kubectl version - -# simulate traffic -bin/go-run controller/script/simulate-proxy --kubeconfig ~/.kube/config --addr $DOCKER_IP:8086 --max-pods 10 --sleep 10ms ``` +Note that the Kubernetes cluster your system is configured to talk to must not be referenced via +`localhost` in your Kubernetes config file, as `simulate-proxy` will not be able to connect to it. + This includes Kubernetes on Docker For Mac. ### Testing ```bash diff --git a/controller/script/simulate-proxy/main.go b/controller/script/simulate-proxy/main.go index b7baee979..1e15902fb 100644 --- a/controller/script/simulate-proxy/main.go +++ b/controller/script/simulate-proxy/main.go @@ -1,20 +1,19 @@ package main import ( - "context" "flag" + "fmt" "math" "math/rand" "net/http" - "strconv" + "os" + "os/signal" "strings" "time" - "github.com/runconduit/conduit/controller/api/proxy" - common "github.com/runconduit/conduit/controller/gen/common" - pb "github.com/runconduit/conduit/controller/gen/proxy/telemetry" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/runconduit/conduit/controller/k8s" - "github.com/runconduit/conduit/controller/util" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "k8s.io/api/core/v1" @@ -22,9 +21,27 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth" ) -/* A simple script for posting simulated telemetry data to the proxy api */ +/* A simple script for exposing simulated prometheus metrics */ + +type simulatedProxy struct { + podOwner string + deploymentName string + sleep time.Duration + namespace string + deployments []string + registerer *prom.Registry + *proxyMetricCollectors +} + +type proxyMetricCollectors struct { + requestTotals *prom.CounterVec + responseTotals *prom.CounterVec + requestDurationMs *prom.HistogramVec + responseLatencyMs *prom.HistogramVec +} var ( + labels = generatePromLabels() grpcResponseCodes = []codes.Code{ codes.OK, codes.PermissionDenied, @@ -93,23 +110,20 @@ var ( http.StatusNetworkAuthenticationRequired, } - ports = []uint32{3333, 6262} - // latencyBucketBounds holds the maximum value (inclusive, in tenths of a // millisecond) that may be counted in a given histogram bucket. - // These values are one order of magnitude greater than the controller's // Prometheus buckets, because the proxy will reports latencies in tenths // of a millisecond rather than whole milliseconds. - latencyBucketBounds = [26]uint32{ + latencyBucketBounds = []float64{ // prometheus.LinearBuckets(1, 1, 5), 10, 20, 30, 40, 50, // prometheus.LinearBuckets(10, 10, 5), - 100, 200, 300, 400, 50, + 100, 200, 300, 400, 500, // prometheus.LinearBuckets(100, 100, 5), 1000, 2000, 3000, 4000, 5000, // prometheus.LinearBuckets(1000, 1000, 5), - 10000, 20000, 30000, 40000, 5000, + 10000, 20000, 30000, 40000, 50000, // prometheus.LinearBuckets(10000, 10000, 5), 100000, 200000, 300000, 400000, 500000, // Prometheus implicitly creates a max bucket for everything that @@ -119,8 +133,122 @@ var ( } ) -func randomPort() uint32 { - return ports[rand.Intn(len(ports))] +// generateProxyTraffic randomly creates metrics under the guise of a single conduit proxy routing traffic. +// metrics are generated for each proxyMetricCollector. +func (s *simulatedProxy) generateProxyTraffic() { + + for { + inboundRandomCount := int(rand.Float64() * 10) + outboundRandomCount := int(rand.Float64() * 10) + deployment := getRandomDeployment(s.deployments, map[string]struct{}{s.podOwner: {}}) + + //split the deployment name into ["namespace", "deployment"] + destinationDeploymentName := strings.Split(deployment, "/")[1] + + s.requestTotals. + With(overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, false))). + Add(float64(inboundRandomCount)) + + s.responseTotals. + With(overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, true))). + Add(float64(outboundRandomCount)) + + observeHistogramVec( + randomLatencies(randomCount()), + s.responseLatencyMs, + overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, true))) + + observeHistogramVec( + randomLatencies(randomCount()), + s.requestDurationMs, + overrideDefaultLabels(s.newConduitLabel(destinationDeploymentName, false))) + + time.Sleep(s.sleep) + } +} + +// newConduitLabel creates a label map to be used for metric generation. +func (s *simulatedProxy) newConduitLabel(destinationPod string, isResponseLabel bool) prom.Labels { + labelMap := prom.Labels{ + "direction": randomRequestDirection(), + "deployment": s.deploymentName, + "authority": "world.greeting:7778", + "namespace": s.namespace, + } + if labelMap["direction"] == "outbound" { + labelMap["dst_deployment"] = destinationPod + } + if isResponseLabel { + if rand.Intn(2) == 0 { + labelMap["grpc_status_code"] = fmt.Sprintf("%d", randomGrpcResponseCode()) + } else { + labelMap["status_code"] = fmt.Sprintf("%d", randomHttpResponseCode()) + } + } + + return labelMap +} + +// observeHistogramVec uses a latencyBuckets slice which holds an array of numbers that indicate +// how many observations will be added to a each bucket. latencyBuckets and latencyBucketBounds +// both are of the same array length. ObserveHistogramVec selects a latencyBucketBound based on a position +// in the latencyBucket and then makes an observation within the selected bucket. +func observeHistogramVec(latencyBuckets []uint32, latencies *prom.HistogramVec, latencyLabels prom.Labels) { + for bucketNum, count := range latencyBuckets { + latencyMs := float64(latencyBucketBounds[bucketNum]) / 10 + for i := uint32(0); i < count; i++ { + latencies.With(latencyLabels).Observe(latencyMs) + } + } +} + +func randomRequestDirection() string { + if rand.Intn(2) == 0 { + return "inbound" + } + return "outbound" +} + +func generatePromLabels() []string { + kubeResourceTypes := []string{ + "job", + "replica_set", + "deployment", + "daemon_set", + "replication_controller", + "namespace", + } + constantLabels := []string{ + "direction", + "authority", + "status_code", + "grpc_status_code", + } + + destinationLabels := make([]string, len(kubeResourceTypes)) + + for i, label := range kubeResourceTypes { + destinationLabels[i] = fmt.Sprintf("dst_%s", label) + } + return append(append(constantLabels, kubeResourceTypes...), destinationLabels...) +} + +// overrideDefaultLabels combines two maps of the same size with the keys +// map1 values take precedence during the union +func overrideDefaultLabels(map1 map[string]string) map[string]string { + map2 := generateLabelMap(labels) + for k := range map2 { + map2[k] = map1[k] + } + return map2 +} + +func generateLabelMap(labels []string) map[string]string { + labelMap := make(map[string]string, len(labels)) + for _, label := range labels { + labelMap[label] = "" + } + return labelMap } func randomCount() uint32 { @@ -138,30 +266,6 @@ func randomLatencies(count uint32) []uint32 { return latencies } -func randomGrpcEos(count uint32) (eos []*pb.EosScope) { - grpcResponseCodes := make(map[uint32]uint32) - for i := uint32(0); i < count; i++ { - grpcResponseCodes[randomGrpcResponseCode()] += 1 - } - for code, streamCount := range grpcResponseCodes { - eos = append(eos, &pb.EosScope{ - Ctx: &pb.EosCtx{End: &pb.EosCtx_GrpcStatusCode{GrpcStatusCode: code}}, - Streams: streamCount, - }) - } - return -} - -func randomH2Eos(count uint32) (eos []*pb.EosScope) { - for i := uint32(0); i < count; i++ { - eos = append(eos, &pb.EosScope{ - Ctx: &pb.EosCtx{End: &pb.EosCtx_Other{Other: true}}, - Streams: uint32(rand.Int31()), - }) - } - return -} - func randomGrpcResponseCode() uint32 { return uint32(grpcResponseCodes[rand.Intn(len(grpcResponseCodes))]) } @@ -170,44 +274,94 @@ func randomHttpResponseCode() uint32 { return uint32(httpResponseCodes[rand.Intn(len(httpResponseCodes))]) } -func stringToIp(str string) *common.IPAddress { - octets := make([]uint8, 0) - for _, num := range strings.Split(str, ".") { - oct, _ := strconv.Atoi(num) - octets = append(octets, uint8(oct)) - } - return util.IPV4(octets[0], octets[1], octets[2], octets[3]) -} - func podIndexFunc(obj interface{}) ([]string, error) { return nil, nil } -func randomPod(pods []*v1.Pod, prvPodIp *common.IPAddress) *common.IPAddress { - var podIp *common.IPAddress - for { - if podIp != nil { - break - } +func getRandomDeployment(deployments []string, excludeDeployments map[string]struct{}) string { + filteredDeployments := make([]string, 0) - randomPod := pods[rand.Intn(len(pods))] - if strings.HasPrefix(randomPod.GetNamespace(), "kube-") { - continue // skip pods in the kube-* namespaces - } - podIp = stringToIp(randomPod.Status.PodIP) - if prvPodIp != nil && podIp.GetIpv4() == prvPodIp.GetIpv4() { - podIp = nil + for _, deployment := range deployments { + if _, ok := excludeDeployments[deployment]; !ok { + filteredDeployments = append(filteredDeployments, deployment) } } - return podIp + return filteredDeployments[rand.Intn(len(filteredDeployments))] + +} + +func newSimulatedProxy(podOwner string, deployments []string, sleep *time.Duration) *simulatedProxy { + podOwnerComponents := strings.Split(podOwner, "/") + name := podOwnerComponents[1] + namespace := podOwnerComponents[0] + + proxy := simulatedProxy{ + podOwner: podOwner, + sleep: *sleep, + deployments: deployments, + namespace: namespace, + deploymentName: name, + registerer: prom.NewRegistry(), + proxyMetricCollectors: &proxyMetricCollectors{ + requestTotals: prom.NewCounterVec( + prom.CounterOpts{ + Name: "request_total", + Help: "A counter of the number of requests the proxy has received", + }, labels), + responseTotals: prom.NewCounterVec( + prom.CounterOpts{ + Name: "response_total", + Help: "A counter of the number of responses the proxy has received.", + }, labels), + requestDurationMs: prom.NewHistogramVec( + prom.HistogramOpts{ + Name: "request_duration_ms", + Help: "A histogram of the duration of a response", + Buckets: latencyBucketBounds, + }, labels), + responseLatencyMs: prom.NewHistogramVec( + prom.HistogramOpts{ + Name: "response_latency_ms", + Help: "A histogram of the total latency of a response.", + Buckets: latencyBucketBounds, + }, labels), + }, + } + + proxy.registerer.MustRegister( + proxy.requestTotals, + proxy.responseTotals, + proxy.requestDurationMs, + proxy.responseLatencyMs, + ) + return &proxy +} + +func getDeployments(podList []*v1.Pod, deploys *k8s.ReplicaSetStore, maxPods *int) []string { + allPods := make([]*v1.Pod, 0) + deploymentSet := make(map[string]struct{}) + for _, pod := range podList { + if pod.Status.PodIP != "" && !strings.HasPrefix(pod.GetNamespace(), "kube-") && (*maxPods == 0 || len(allPods) < *maxPods) { + allPods = append(allPods, pod) + deploymentName, err := deploys.GetDeploymentForPod(pod) + if err != nil { + log.Fatal(err.Error()) + } + deploymentSet[deploymentName] = struct{}{} + } + } + + deployments := make([]string, 0) + for deployment := range deploymentSet { + deployments = append(deployments, deployment) + } + return deployments } func main() { rand.Seed(time.Now().UnixNano()) - - addr := flag.String("addr", ":8086", "address of proxy api") - requestCount := flag.Int("requests", 0, "number of api requests to make (default: infinite)") sleep := flag.Duration("sleep", time.Second, "time to sleep between requests") + metricsAddrs := flag.String("metric-addrs", ":9000,:9001,:9002", "range of network addresses to serve prometheus metrics") maxPods := flag.Int("max-pods", 0, "total number of pods to simulate (default unlimited)") kubeConfigPath := flag.String("kubeconfig", "", "path to kube config - required") flag.Parse() @@ -217,12 +371,6 @@ func main() { return } - client, conn, err := proxy.NewTelemetryClient(*addr) - if err != nil { - log.Fatal(err.Error()) - } - defer conn.Close() - clientSet, err := k8s.NewClientSet(*kubeConfigPath) if err != nil { log.Fatal(err.Error()) @@ -233,120 +381,49 @@ func main() { log.Fatal(err.Error()) } + deploys, err := k8s.NewReplicaSetStore(clientSet) + if err != nil { + log.Fatal(err.Error()) + } + err = pods.Run() if err != nil { log.Fatal(err.Error()) } + err = deploys.Run() + if err != nil { + log.Fatal(err.Error()) + } + podList, err := pods.List() if err != nil { log.Fatal(err.Error()) } - allPods := make([]*v1.Pod, 0) - for _, pod := range podList { - if pod.Status.PodIP != "" && (*maxPods == 0 || len(allPods) < *maxPods) { - allPods = append(allPods, pod) - } - } - - for i := 0; (*requestCount == 0) || (i < *requestCount); i++ { - count := randomCount() - sourceIp := randomPod(allPods, nil) - targetIp := randomPod(allPods, sourceIp) - - req := &pb.ReportRequest{ - Process: &pb.Process{ - ScheduledInstance: "hello-1mfa0", - ScheduledNamespace: "people", - }, - ClientTransports: []*pb.ClientTransport{ - // TCP - &pb.ClientTransport{ - TargetAddr: &common.TcpAddress{ - Ip: targetIp, - Port: randomPort(), - }, - Connects: count, - Disconnects: []*pb.TransportSummary{ - &pb.TransportSummary{ - DurationMs: uint64(randomCount()), - BytesSent: uint64(randomCount()), - }, - }, - Protocol: common.Protocol_TCP, - }, - }, - ServerTransports: []*pb.ServerTransport{ - // TCP - &pb.ServerTransport{ - SourceIp: sourceIp, - Connects: count, - Disconnects: []*pb.TransportSummary{ - &pb.TransportSummary{ - DurationMs: uint64(randomCount()), - BytesSent: uint64(randomCount()), - }, - }, - Protocol: common.Protocol_TCP, - }, - }, - Proxy: pb.ReportRequest_INBOUND, - Requests: []*pb.RequestScope{ - - // gRPC - &pb.RequestScope{ - Ctx: &pb.RequestCtx{ - SourceIp: sourceIp, - TargetAddr: &common.TcpAddress{ - Ip: targetIp, - Port: randomPort(), - }, - Authority: "world.greeting:7778", - }, - Count: count, - Responses: []*pb.ResponseScope{ - &pb.ResponseScope{ - Ctx: &pb.ResponseCtx{ - HttpStatusCode: http.StatusOK, - }, - ResponseLatencyCounts: randomLatencies(count), - Ends: randomGrpcEos(count), - }, - }, - }, - - // HTTP/2 - &pb.RequestScope{ - Ctx: &pb.RequestCtx{ - SourceIp: sourceIp, - TargetAddr: &common.TcpAddress{ - Ip: targetIp, - Port: randomPort(), - }, - Authority: "world.greeting:7778", - }, - Count: count, - Responses: []*pb.ResponseScope{ - &pb.ResponseScope{ - Ctx: &pb.ResponseCtx{ - HttpStatusCode: randomHttpResponseCode(), - }, - ResponseLatencyCounts: randomLatencies(count), - Ends: randomH2Eos(count), - }, - }, - }, - }, - - HistogramBucketBoundsTenthMs: latencyBucketBounds[:], - } - - _, err = client.Report(context.Background(), req) - if err != nil { - log.Fatal(err.Error()) - } - - time.Sleep(*sleep) + deployments := getDeployments(podList, deploys, maxPods) + + stopCh := make(chan os.Signal) + signal.Notify(stopCh, os.Interrupt, os.Kill) + + excludedDeployments := map[string]struct{}{} + + for _, addr := range strings.Split(*metricsAddrs, ",") { + randomPodOwner := getRandomDeployment(deployments, excludedDeployments) + excludedDeployments[randomPodOwner] = struct{}{} + + go func(address string, podOwner string, deployments []string) { + + proxy := newSimulatedProxy(podOwner, deployments, sleep) + server := &http.Server{ + Addr: address, + Handler: promhttp.HandlerFor(proxy.registerer, promhttp.HandlerOpts{}), + } + log.Infof("serving scrapable metrics on %s", address) + go server.ListenAndServe() + go proxy.generateProxyTraffic() + + }(addr, randomPodOwner, deployments) } + <-stopCh } diff --git a/docker-compose.yml b/docker-compose.yml index 4e9b7fb63..1c0428750 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -110,3 +110,19 @@ services: command: - --config.file=/etc/prometheus/prometheus.yml - --storage.tsdb.retention=6h + + simulate-proxy: + image: golang:1.10.0-alpine3.7 + ports: + - 9000-9002:9000-9002 + volumes: + - .:/go/src/github.com/runconduit/conduit + - ~/.kube/config:/kubeconfig:ro + command: + - go + - run + - /go/src/github.com/runconduit/conduit/controller/script/simulate-proxy/main.go + - --metric-addrs=0.0.0.0:9000,0.0.0.0:9001,0.0.0.0:9002 + - --kubeconfig=/kubeconfig + - --max-pods=10 + - --sleep=3s