Add peer label to TCP read and write stat queries (#5903)

Add peer label to TCP read and write stat queries

Closes #5693

### Tests
---

After refactoring, `linkerd viz stat` behaves the same way (I haven't checked gateways or routes).

```
$ linkerd viz stat deploy/web -n emojivoto -o wide

NAME   MESHED   SUCCESS      RPS   LATENCY_P50   LATENCY_P95   LATENCY_P99   TCP_CONN   READ_BYTES/SEC   WRITE_BYTES/SEC
web       1/1    91.91%   2.3rps           2ms           4ms           5ms          3         185.3B/s         5180.0B/s

# same value as before, latency seems to have dropped

time="2021-03-22T18:19:44Z" level=debug msg="Query request:\n\tsum(increase(tcp_write_bytes_total{deployment=\"web\", direction=\"inbound\", namespace=\"emojivoto\", peer=\"src\"}[1m])) by (namespace, deployment)"

time="2021-03-22T18:19:44Z" level=debug msg="Query request:\n\tsum(increase(tcp_read_bytes_total{deployment=\"web\", direction=\"inbound\", namespace=\"emojivoto\", peer=\"src\"}[1m])) by (namespace, deployment)"

# queries show the peer label
---

$ linkerd viz stat deploy/web -n emojivoto --from deploy/vote-bot -o wide

NAME   MESHED   SUCCESS      RPS   LATENCY_P50   LATENCY_P95   LATENCY_P99   TCP_CONN   READ_BYTES/SEC   WRITE_BYTES/SEC
web       1/1    93.16%   1.9rps           3ms           4ms           4ms          1        4503.4B/s          153.1B/s


# stats same as before except for latency which seems to have dropped a bit

time="2021-03-22T18:22:10Z" level=debug msg="Query request:\n\tsum(increase(tcp_write_bytes_total{deployment=\"vote-bot\", direction=\"outbound\", dst_deployment=\"web\", dst_namespace=\"emojivoto\", namespace=\"emojivoto\", peer=\"dst\"}[1m])) by (dst_namespace, dst_deployment)"

time="2021-03-22T18:22:10Z" level=debug msg="Query request:\n\tsum(increase(tcp_read_bytes_total{deployment=\"vote-bot\", direction=\"outbound\", dst_deployment=\"web\", dst_namespace=\"emojivoto\", namespace=\"emojivoto\", peer=\"dst\"}[1m])) by (dst_namespace, dst_deployment)"

# queries show the right label
```

Signed-off-by: mateiidavid <matei.david.35@gmail.com>
This commit is contained in:
Matei David 2021-03-26 17:36:30 +00:00 committed by GitHub
parent 073212ac6d
commit e798b33e2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 46 deletions

View File

@ -117,10 +117,11 @@ func (s *grpcServer) getGatewaysMetrics(ctx context.Context, req *pb.GatewaysReq
labels, groupBy := buildGatewaysRequestLabels(req)
promQueries := map[promType]string{
promGatewayAlive: gatewayAliveQuery,
promGatewayAlive: fmt.Sprintf(gatewayAliveQuery, labels.String(), groupBy.String()),
}
metricsResp, err := s.getPrometheusMetrics(ctx, promQueries, gatewayLatencyQuantileQuery, labels.String(), timeWindow, groupBy.String())
quantileQueries := generateQuantileQueries(gatewayLatencyQuantileQuery, labels.String(), timeWindow, groupBy.String())
metricsResp, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
if err != nil {
return nil, err

View File

@ -24,16 +24,15 @@ type promResult struct {
}
const (
promGatewayAlive = promType("QUERY_GATEWAY_ALIVE")
promNumMirroredServices = promType("QUERY_NUM_MIRRORED_SERVICES")
promRequests = promType("QUERY_REQUESTS")
promActualRequests = promType("QUERY_ACTUAL_REQUESTS")
promTCPConnections = promType("QUERY_TCP_CONNECTIONS")
promTCPReadBytes = promType("QUERY_TCP_READ_BYTES")
promTCPWriteBytes = promType("QUERY_TCP_WRITE_BYTES")
promLatencyP50 = promType("0.5")
promLatencyP95 = promType("0.95")
promLatencyP99 = promType("0.99")
promGatewayAlive = promType("QUERY_GATEWAY_ALIVE")
promRequests = promType("QUERY_REQUESTS")
promActualRequests = promType("QUERY_ACTUAL_REQUESTS")
promTCPConnections = promType("QUERY_TCP_CONNECTIONS")
promTCPReadBytes = promType("QUERY_TCP_READ_BYTES")
promTCPWriteBytes = promType("QUERY_TCP_WRITE_BYTES")
promLatencyP50 = promType("0.5")
promLatencyP95 = promType("0.95")
promLatencyP99 = promType("0.99")
namespaceLabel = model.LabelName("namespace")
dstNamespaceLabel = model.LabelName("dst_namespace")
@ -168,6 +167,16 @@ func generateLabelStringWithRegex(l model.LabelSet, labelName string, stringToMa
return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))
}
// generate Prometheus queries for latency quantiles, based on a quantile query
// template, query labels, a time window and grouping.
func generateQuantileQueries(quantileQuery, labels, timeWindow, groupBy string) map[promType]string {
return map[promType]string{
promLatencyP50: fmt.Sprintf(quantileQuery, promLatencyP50, labels, timeWindow, groupBy),
promLatencyP95: fmt.Sprintf(quantileQuery, promLatencyP95, labels, timeWindow, groupBy),
promLatencyP99: fmt.Sprintf(quantileQuery, promLatencyP99, labels, timeWindow, groupBy),
}
}
// determine if we should add "namespace=<namespace>" to a named query
func shouldAddNamespaceLabel(resource *pb.Resource) bool {
return resource.Type != k8s.Namespace && resource.Namespace != ""
@ -180,23 +189,21 @@ func promDirectionLabels(direction string) model.LabelSet {
}
}
func promPeerLabel(peer string) model.LabelSet {
return model.LabelSet{
model.LabelName("peer"): model.LabelValue(peer),
}
}
func promResourceType(resource *pb.Resource) model.LabelName {
l5dLabel := k8s.KindToL5DLabel(resource.Type)
return model.LabelName(l5dLabel)
}
func (s *grpcServer) getPrometheusMetrics(ctx context.Context, requestQueryTemplates map[promType]string, latencyQueryTemplate, labels, timeWindow, groupBy string) ([]promResult, error) {
func (s *grpcServer) getPrometheusMetrics(ctx context.Context, requestQueries map[promType]string, latencyQueries map[promType]string) ([]promResult, error) {
resultChan := make(chan promResult)
// kick off asynchronous queries: request count queries + 3 latency queries
for pt, requestQueryTemplate := range requestQueryTemplates {
var query string
if pt == promTCPConnections || pt == promGatewayAlive || pt == promNumMirroredServices {
query = fmt.Sprintf(requestQueryTemplate, labels, groupBy)
} else {
query = fmt.Sprintf(requestQueryTemplate, labels, timeWindow, groupBy)
}
for pt, query := range requestQueries {
go func(typ promType, promQuery string) {
resultVector, err := s.queryProm(ctx, promQuery)
resultChan <- promResult{
@ -207,25 +214,20 @@ func (s *grpcServer) getPrometheusMetrics(ctx context.Context, requestQueryTempl
}(pt, query)
}
quantiles := []promType{promLatencyP50, promLatencyP95, promLatencyP99}
for _, quantile := range quantiles {
go func(quantile promType) {
latencyQuery := fmt.Sprintf(latencyQueryTemplate, quantile, labels, timeWindow, groupBy)
latencyResult, err := s.queryProm(ctx, latencyQuery)
for quantile, query := range latencyQueries {
go func(qt promType, promQuery string) {
resultVector, err := s.queryProm(ctx, promQuery)
resultChan <- promResult{
prom: quantile,
vec: latencyResult,
prom: qt,
vec: resultVector,
err: err,
}
}(quantile)
}(quantile, query)
}
// process results, receive one message per prometheus query type
var err error
results := []promResult{}
for i := 0; i < len(quantiles)+len(requestQueryTemplates); i++ {
for i := 0; i < len(latencyQueries)+len(requestQueries); i++ {
result := <-resultChan
if result.err != nil {
log.Errorf("queryProm failed with: %s", result.err)

View File

@ -490,18 +490,35 @@ func buildTrafficSplitRequestLabels(req *pb.StatSummaryRequest) (labels model.La
return labels, groupBy
}
func buildTCPStatsRequestLabels(req *pb.StatSummaryRequest, reqLabels model.LabelSet) string {
switch req.Outbound.(type) {
case *pb.StatSummaryRequest_ToResource, *pb.StatSummaryRequest_FromResource:
// If TCP stats are queried from a resource to another one (i.e outbound -- from/to), then append peer='dst'
reqLabels = reqLabels.Merge(promPeerLabel("dst"))
default:
// If TCP stats are not queried from a specific resource (i.e inbound -- no to/from), then append peer='src'
reqLabels = reqLabels.Merge(promPeerLabel("src"))
}
return reqLabels.String()
}
func (s *grpcServer) getStatMetrics(ctx context.Context, req *pb.StatSummaryRequest, timeWindow string) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats, error) {
reqLabels, groupBy := buildRequestLabels(req)
promQueries := map[promType]string{
promRequests: reqQuery,
promRequests: fmt.Sprintf(reqQuery, reqLabels.String(), timeWindow, groupBy.String()),
}
if req.TcpStats {
promQueries[promTCPConnections] = tcpConnectionsQuery
promQueries[promTCPReadBytes] = tcpReadBytesQuery
promQueries[promTCPWriteBytes] = tcpWriteBytesQuery
promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, reqLabels.String(), groupBy.String())
// For TCP read/write bytes total we add an additional 'peer' label with a value of either 'src' or 'dst'
tcpLabels := buildTCPStatsRequestLabels(req, reqLabels)
promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabels, timeWindow, groupBy.String())
promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabels, timeWindow, groupBy.String())
}
results, err := s.getPrometheusMetrics(ctx, promQueries, latencyQuantileQuery, reqLabels.String(), timeWindow, groupBy.String())
quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels.String(), timeWindow, groupBy.String())
results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
if err != nil {
return nil, nil, err
@ -523,10 +540,11 @@ func (s *grpcServer) getTrafficSplitMetrics(ctx context.Context, req *pb.StatSum
reqLabels := generateLabelStringWithRegex(labels, "authority", stringToMatch)
promQueries := map[promType]string{
promRequests: reqQuery,
promRequests: fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy.String()),
}
results, err := s.getPrometheusMetrics(ctx, promQueries, latencyQuantileQuery, reqLabels, timeWindow, groupBy.String())
quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels, timeWindow, groupBy.String())
results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
if err != nil {
return nil, err

View File

@ -742,8 +742,8 @@ status:
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`sum(increase(response_total{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod, classification, tls)`,
`sum(tcp_open_connections{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}) by (namespace, pod)`,
`sum(increase(tcp_read_bytes_total{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod)`,
`sum(increase(tcp_write_bytes_total{direction="inbound", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod)`,
`sum(increase(tcp_read_bytes_total{direction="inbound", namespace="emojivoto", peer="src", pod="emojivoto-1"}[1m])) by (namespace, pod)`,
`sum(increase(tcp_write_bytes_total{direction="inbound", namespace="emojivoto", peer="src", pod="emojivoto-1"}[1m])) by (namespace, pod)`,
},
},
req: &pb.StatSummaryRequest{
@ -769,6 +769,67 @@ status:
testStatSummary(t, expectations)
})
t.Run("Queries prometheus for outbound TCP stats if --to resource is specified", func(t *testing.T) {
expectations := []statSumExpected{
{
expectedStatRPC: expectedStatRPC{
err: nil,
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
name: emojivoto-1
namespace: emojivoto
labels:
app: emoji-svc
linkerd.io/control-plane-ns: linkerd
status:
phase: Running
`,
},
mockPromResponse: prometheusMetric("emojivoto-1", "pod"),
expectedPrometheusQueries: []string{
`histogram_quantile(0.5, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.95, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`histogram_quantile(0.99, sum(irate(response_latency_ms_bucket{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (le, namespace, pod))`,
`sum(increase(response_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}[1m])) by (namespace, pod, classification, tls)`,
`sum(tcp_open_connections{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", pod="emojivoto-1"}) by (namespace, pod)`,
`sum(increase(tcp_read_bytes_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", peer="dst", pod="emojivoto-1"}[1m])) by (namespace, pod)`,
`sum(increase(tcp_write_bytes_total{direction="outbound", dst_namespace="emojivoto", dst_pod="emojivoto-2", namespace="emojivoto", peer="dst", pod="emojivoto-1"}[1m])) by (namespace, pod)`,
},
},
req: &pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Name: "emojivoto-1",
Namespace: "emojivoto",
Type: pkgK8s.Pod,
},
},
TimeWindow: "1m",
TcpStats: true,
Outbound: &pb.StatSummaryRequest_ToResource{
ToResource: &pb.Resource{
Name: "emojivoto-2",
Namespace: "emojivoto",
Type: pkgK8s.Pod,
},
},
},
expectedResponse: GenStatSummaryResponse("emojivoto-1", pkgK8s.Pod, []string{"emojivoto"}, &PodCounts{
Status: "Running",
MeshedPods: 1,
RunningPods: 1,
FailedPods: 0,
}, true, true),
},
}
testStatSummary(t, expectations)
})
t.Run("Queries prometheus for a specific resource if name is specified", func(t *testing.T) {
expectations := []statSumExpected{
{

View File

@ -240,15 +240,16 @@ func (s *grpcServer) getRouteMetrics(ctx context.Context, req *pb.TopRoutesReque
groupBy := "rt_route"
queries := map[promType]string{
promRequests: routeReqQuery,
promRequests: fmt.Sprintf(routeReqQuery, reqLabels, timeWindow, groupBy),
}
if req.GetOutbound() != nil && req.GetNone() == nil {
// If this req has an Outbound, then query the actual request counts as well.
queries[promActualRequests] = actualRouteReqQuery
queries[promActualRequests] = fmt.Sprintf(actualRouteReqQuery, reqLabels, timeWindow, groupBy)
}
results, err := s.getPrometheusMetrics(ctx, queries, routeLatencyQuantileQuery, reqLabels, timeWindow, groupBy)
quantileQueries := generateQuantileQueries(routeLatencyQuantileQuery, reqLabels, timeWindow, groupBy)
results, err := s.getPrometheusMetrics(ctx, queries, quantileQueries)
if err != nil {
return nil, err
}