From 6eec6256f784b390bf542d27f09de80ccd8ea705 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 19 Apr 2018 15:18:43 -0700 Subject: [PATCH] Add transport-level metrics to simulate-proxy (#811) This PR adds the transport-level metrics described in #742 to the `simulate-proxy` script. This will be useful while adding these metrics to the Grafana dashboard and/or CLI. Closes #793 --- controller/script/simulate-proxy/main.go | 247 +++++++++++++++++++++-- 1 file changed, 231 insertions(+), 16 deletions(-) diff --git a/controller/script/simulate-proxy/main.go b/controller/script/simulate-proxy/main.go index cb759400a..c743766f1 100644 --- a/controller/script/simulate-proxy/main.go +++ b/controller/script/simulate-proxy/main.go @@ -33,11 +33,19 @@ type simulatedProxy struct { } type proxyMetricCollectors struct { - requestTotals *prom.CounterVec - responseTotals *prom.CounterVec - requestDurationMs *prom.HistogramVec - responseLatencyMs *prom.HistogramVec - responseDurationMs *prom.HistogramVec + requestTotals *prom.CounterVec + responseTotals *prom.CounterVec + requestDurationMs *prom.HistogramVec + responseLatencyMs *prom.HistogramVec + responseDurationMs *prom.HistogramVec + tcpAcceptOpenTotal *prom.CounterVec + tcpAcceptCloseTotal *prom.CounterVec + tcpConnectOpenTotal *prom.CounterVec + tcpConnectCloseTotal *prom.CounterVec + tcpConnectionsOpen *prom.GaugeVec + tcpConnectionDurationMs *prom.HistogramVec + receivedBytes *prom.CounterVec + sentBytes *prom.CounterVec } const ( @@ -165,6 +173,8 @@ func (s *simulatedProxy) generateProxyTraffic() { s.inbound.responseDurationMs.With(inboundResponseLabels).Observe(latency) } + s.inbound.generateTCPStats(inboundRandomCount) + // // outbound // @@ -192,12 +202,114 @@ func (s *simulatedProxy) generateProxyTraffic() { for _, latency := range randomLatencies(outboundRandomCount) { s.outbound.responseDurationMs.With(outboundResponseLabels).Observe(latency) } + + s.outbound.generateTCPStats(outboundRandomCount) } time.Sleep(s.sleep) } } +func (p *proxyMetricCollectors) generateTCPStats(randomCount int) { + logCtx := log.WithFields(log.Fields{"randomCount": randomCount}) + if randomCount <= 0 { + logCtx.Debugln("generateTCPStats: randomCount <= 0; skipping") + return + } + + // TODO: throw in some raw TCP connections + openLabels := prom.Labels{"protocol": "http"} + closeLabels := prom.Labels{"protocol": "http", "classification": "success"} + failLabels := prom.Labels{"protocol": "http", "classification": "failure"} + + // jitter the accept/connect counts a little bit to simulate connection pooling etc. + acceptCount := jitter(randomCount, 0.1) + + p.tcpAcceptOpenTotal.With(openLabels).Add(float64(acceptCount)) + + // up to acceptCount accepted connections remain open... + acceptOpenCount := rand.Intn(acceptCount) + // ...and the rest have been closed. + acceptClosedCount := acceptCount - acceptOpenCount + + // simulate some failures + acceptFailedCount := 0 + if acceptClosedCount >= 2 { + acceptFailedCount = rand.Intn(acceptClosedCount / 2) + acceptClosedCount -= acceptFailedCount + p.tcpAcceptCloseTotal.With(failLabels).Add(float64(acceptFailedCount)) + } + + p.tcpAcceptCloseTotal.With(closeLabels).Add(float64(acceptClosedCount)) + + connectCount := jitter(randomCount, 0.1) + + p.tcpConnectOpenTotal.With(openLabels).Add(float64(connectCount)) + + connectOpenCount := rand.Intn(connectCount) + connectClosedCount := connectCount - connectOpenCount + connectFailedCount := 0 + + if connectClosedCount >= 2 { + connectFailedCount = rand.Intn(connectClosedCount / 2) + connectClosedCount -= connectFailedCount + p.tcpConnectCloseTotal.With(failLabels).Add(float64(connectFailedCount)) + } + + p.tcpConnectCloseTotal.With(closeLabels).Add(float64(connectClosedCount)) + + p.tcpConnectionsOpen.With(openLabels).Set(float64(acceptOpenCount + connectOpenCount)) + + // connect durations + bytes sent/received + totalClosed := acceptClosedCount + connectClosedCount + for _, latency := range randomLatencies(totalClosed) { + p.tcpConnectionDurationMs.With(closeLabels).Observe(latency) + // XXX: are these reasonable values for sent/received bytes? + p.sentBytes.With(closeLabels).Add(float64(rand.Intn(50000) + 1024)) + p.receivedBytes.With(closeLabels).Add(float64(rand.Intn(50000) + 1024)) + } + + // durations for simulated failures + totalFailed := acceptFailedCount + connectFailedCount + for _, latency := range randomLatencies(totalFailed) { + p.tcpConnectionDurationMs.With(failLabels).Observe(latency) + // XXX: are these reasonable values for sent/received bytes? + p.sentBytes.With(failLabels).Add(float64(rand.Intn(50000))) + p.receivedBytes.With(failLabels).Add(float64(rand.Intn(50000))) + } + +} + +func jitter(toJitter int, frac float64) int { + logCtx := log.WithFields(log.Fields{ + "toJitter": toJitter, + "frac": frac, + }) + if toJitter <= 0 { + logCtx.Debugln("jitter(): toJitter <= 0; returning 0") + return 0 + } + + sign := rand.Intn(2) + if sign == 0 { + sign = -1 + } + + amount := int(float64(toJitter)*frac) + 1 + jitter := rand.Intn(amount) * sign + jittered := toJitter + jitter + + if jittered <= 0 { + logCtx.WithFields(log.Fields{ + "amount": amount, + "jitter": jitter, + "jittered": jittered, + }).Debugln("jitter(): jittered <= 0; returning 1") + return 1 + } + return jittered +} + func randomResponseLabels() prom.Labels { labelMap := prom.Labels{"classification": "success"} @@ -266,6 +378,20 @@ func newSimulatedProxy(pod v1.Pod, deployments []string, replicaSets *k8s.Replic deploymentName := strings.Split(ownerInfo, "/")[1] dstDeployments := filterDeployments(deployments, map[string]struct{}{deploymentName: {}}, maxDst) + constTCPLabels := prom.Labels{ + // TCP metrics won't be labeled with an authority. + "namespace": pod.GetNamespace(), + "deployment": deploymentName, + "pod_template_hash": pod.GetLabels()["pod-template-hash"], + "pod": pod.GetName(), + + // TODO: support other k8s objects + // "daemon_set", + // "k8s_job", + // "replication_controller", + // "replica_set", + } + constLabels := prom.Labels{ "authority": "fakeauthority:123", "namespace": pod.GetNamespace(), @@ -278,6 +404,7 @@ func newSimulatedProxy(pod v1.Pod, deployments []string, replicaSets *k8s.Replic // "k8s_job", // "replication_controller", // "replica_set", + } requestLabels := []string{ @@ -303,6 +430,16 @@ func newSimulatedProxy(pod v1.Pod, deployments []string, replicaSets *k8s.Replic }..., ) + tcpLabels := []string{ + "direction", + "protocol", + } + + tcpCloseLabels := append( + tcpLabels, + []string{"classification"}..., + ) + proxyMetrics := proxyMetricCollectors{ requestTotals: prom.NewCounterVec( prom.CounterOpts{ @@ -337,6 +474,55 @@ func newSimulatedProxy(pod v1.Pod, deployments []string, replicaSets *k8s.Replic ConstLabels: constLabels, Buckets: latencyBucketBounds, }, responseLabels), + tcpAcceptOpenTotal: prom.NewCounterVec( + prom.CounterOpts{ + Name: "tcp_accept_open_total", + Help: "A counter of the total number of transport connections which have been accepted by the proxy.", + ConstLabels: constTCPLabels, + }, tcpLabels), + tcpAcceptCloseTotal: prom.NewCounterVec( + prom.CounterOpts{ + Name: "tcp_accept_close_total", + Help: "A counter of the total number of transport connections accepted by the proxy which have been closed.", + ConstLabels: constTCPLabels, + }, tcpCloseLabels), + tcpConnectOpenTotal: prom.NewCounterVec( + prom.CounterOpts{ + Name: "tcp_connect_open_total", + Help: "A counter of the total number of transport connections which have been opened by the proxy.", + ConstLabels: constTCPLabels, + }, tcpLabels), + tcpConnectCloseTotal: prom.NewCounterVec( + prom.CounterOpts{ + Name: "tcp_connect_close_total", + Help: "A counter of the total number of transport connections opened by the proxy which have been closed.", + ConstLabels: constTCPLabels, + }, tcpCloseLabels), + tcpConnectionsOpen: prom.NewGaugeVec( + prom.GaugeOpts{ + Name: "tcp_connections_open", + Help: "A gauge of the number of transport connections currently open.", + ConstLabels: constTCPLabels, + }, tcpLabels), + tcpConnectionDurationMs: prom.NewHistogramVec( + prom.HistogramOpts{ + Name: "tcp_connection_duration_ms", + Help: "A histogram of the duration of the lifetime of a connection, in milliseconds.", + ConstLabels: constTCPLabels, + Buckets: latencyBucketBounds, + }, tcpCloseLabels), + sentBytes: prom.NewCounterVec( + prom.CounterOpts{ + Name: "sent_bytes", + Help: "A counter of the total number of sent bytes.", + ConstLabels: constTCPLabels, + }, tcpCloseLabels), + receivedBytes: prom.NewCounterVec( + prom.CounterOpts{ + Name: "received_bytes", + Help: "A counter of the total number of recieved bytes.", + ConstLabels: constTCPLabels, + }, tcpCloseLabels), } inboundLabels := prom.Labels{ @@ -348,7 +534,12 @@ func newSimulatedProxy(pod v1.Pod, deployments []string, replicaSets *k8s.Replic "dst_deployment": "", } - outboundLables := prom.Labels{ + // TCP stats don't have dst labels + inboundTCPLabels := prom.Labels{ + "direction": "inbound", + } + + outboundLabels := prom.Labels{ "direction": "outbound", } @@ -357,18 +548,34 @@ func newSimulatedProxy(pod v1.Pod, deployments []string, replicaSets *k8s.Replic deployments: dstDeployments, registerer: prom.NewRegistry(), inbound: &proxyMetricCollectors{ - requestTotals: proxyMetrics.requestTotals.MustCurryWith(inboundLabels), - responseTotals: proxyMetrics.responseTotals.MustCurryWith(inboundLabels), - requestDurationMs: proxyMetrics.requestDurationMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), - responseLatencyMs: proxyMetrics.responseLatencyMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), - responseDurationMs: proxyMetrics.responseDurationMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), + requestTotals: proxyMetrics.requestTotals.MustCurryWith(inboundLabels), + responseTotals: proxyMetrics.responseTotals.MustCurryWith(inboundLabels), + requestDurationMs: proxyMetrics.requestDurationMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), + responseLatencyMs: proxyMetrics.responseLatencyMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), + responseDurationMs: proxyMetrics.responseDurationMs.MustCurryWith(inboundLabels).(*prom.HistogramVec), + tcpAcceptOpenTotal: proxyMetrics.tcpAcceptOpenTotal.MustCurryWith(inboundTCPLabels), + tcpAcceptCloseTotal: proxyMetrics.tcpAcceptCloseTotal.MustCurryWith(inboundTCPLabels), + tcpConnectOpenTotal: proxyMetrics.tcpConnectOpenTotal.MustCurryWith(inboundTCPLabels), + tcpConnectCloseTotal: proxyMetrics.tcpConnectCloseTotal.MustCurryWith(inboundTCPLabels), + tcpConnectionsOpen: proxyMetrics.tcpConnectionsOpen.MustCurryWith(inboundTCPLabels), + tcpConnectionDurationMs: proxyMetrics.tcpConnectionDurationMs.MustCurryWith(inboundTCPLabels).(*prom.HistogramVec), + sentBytes: proxyMetrics.sentBytes.MustCurryWith(inboundTCPLabels), + receivedBytes: proxyMetrics.receivedBytes.MustCurryWith(inboundTCPLabels), }, outbound: &proxyMetricCollectors{ - requestTotals: proxyMetrics.requestTotals.MustCurryWith(outboundLables), - responseTotals: proxyMetrics.responseTotals.MustCurryWith(outboundLables), - requestDurationMs: proxyMetrics.requestDurationMs.MustCurryWith(outboundLables).(*prom.HistogramVec), - responseLatencyMs: proxyMetrics.responseLatencyMs.MustCurryWith(outboundLables).(*prom.HistogramVec), - responseDurationMs: proxyMetrics.responseDurationMs.MustCurryWith(outboundLables).(*prom.HistogramVec), + requestTotals: proxyMetrics.requestTotals.MustCurryWith(outboundLabels), + responseTotals: proxyMetrics.responseTotals.MustCurryWith(outboundLabels), + requestDurationMs: proxyMetrics.requestDurationMs.MustCurryWith(outboundLabels).(*prom.HistogramVec), + responseLatencyMs: proxyMetrics.responseLatencyMs.MustCurryWith(outboundLabels).(*prom.HistogramVec), + responseDurationMs: proxyMetrics.responseDurationMs.MustCurryWith(outboundLabels).(*prom.HistogramVec), + tcpAcceptOpenTotal: proxyMetrics.tcpAcceptOpenTotal.MustCurryWith(outboundLabels), + tcpAcceptCloseTotal: proxyMetrics.tcpAcceptCloseTotal.MustCurryWith(outboundLabels), + tcpConnectOpenTotal: proxyMetrics.tcpConnectOpenTotal.MustCurryWith(outboundLabels), + tcpConnectCloseTotal: proxyMetrics.tcpConnectCloseTotal.MustCurryWith(outboundLabels), + tcpConnectionsOpen: proxyMetrics.tcpConnectionsOpen.MustCurryWith(outboundLabels), + tcpConnectionDurationMs: proxyMetrics.tcpConnectionDurationMs.MustCurryWith(outboundLabels).(*prom.HistogramVec), + sentBytes: proxyMetrics.sentBytes.MustCurryWith(outboundLabels), + receivedBytes: proxyMetrics.receivedBytes.MustCurryWith(outboundLabels), }, } @@ -378,6 +585,14 @@ func newSimulatedProxy(pod v1.Pod, deployments []string, replicaSets *k8s.Replic proxyMetrics.requestDurationMs, proxyMetrics.responseLatencyMs, proxyMetrics.responseDurationMs, + proxyMetrics.tcpAcceptOpenTotal, + proxyMetrics.tcpAcceptCloseTotal, + proxyMetrics.tcpConnectOpenTotal, + proxyMetrics.tcpConnectCloseTotal, + proxyMetrics.tcpConnectionsOpen, + proxyMetrics.tcpConnectionDurationMs, + proxyMetrics.sentBytes, + proxyMetrics.receivedBytes, ) return &proxy }