Expand 'linkerd edges' to work with TCP connections (#5040)

* Expand 'linkerd edges' to work with TCP connections

Fixes #4999

Before:
```
$ bin/linkerd edges po -owide
SRC                                   DST                                    SRC_NS    DST_NS    CLIENT_ID   SERVER_ID   SECURED
linkerd-prometheus-764ddd4f88-t6c2j   rabbitmq-controller-5c6cf7cc6d-8lxp2   linkerd   default                           √
linkerd-prometheus-764ddd4f88-t6c2j   temp                                   linkerd   default                           √

```

After:
```
$ bin/linkerd edges po -owide
SRC                                   DST                                    SRC_NS    DST_NS    CLIENT_ID         SERVER_ID         SECURED
temp                                  rabbitmq-controller-5c6cf7cc6d-5fpsc   default   default   default.default   default.default   √
linkerd-prometheus-66fb97b7fc-vpnxf   rabbitmq-controller-5c6cf7cc6d-5fpsc   linkerd   default                                       √
linkerd-prometheus-66fb97b7fc-vpnxf   temp                                   linkerd   default                                       √
```

With the latest proxy upgrade to v2.113.0 (#5037), the `tcp_open_total` metric now contains the `client_id` label so that we can replace the http-only metric `response_total` with this one to determine edges for TCP-only connections.

This change basically performs the same query as before, but two times, one for `response_total` and another for `tcp_open_total`. For each resulting entry, the latter is kept if `client_id` is present, otherwise the former is used (if present at all). That way things keep on working for older proxies.

Disclaimers:
- This doesn't fix #3706: if two sources connect to the same destination there's no way to tell them appart from the metrics perspective and their edges can get mangled. To fix that, the proxy would have to expose `src_resource` labels in the `tcp_open_total` total inbound metric.
- Note connections coming from prometheus are still unidentified. The reason is those hit the proxy's admin server (instead of the main container) which doesn't expose metrics.
This commit is contained in:
Alejandro Pedraza 2020-10-12 09:14:39 -05:00 committed by GitHub
parent 3af25fa886
commit 777b06ac55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 50 additions and 11 deletions

View File

@ -13,8 +13,8 @@ import (
) )
const ( const (
inboundIdentityQuery = "count(response_total%s) by (%s, client_id, namespace, no_tls_reason)" inboundIdentityQuery = "count(%s%s) by (%s, client_id, namespace, no_tls_reason)"
outboundIdentityQuery = "count(response_total%s) by (%s, dst_%s, server_id, namespace, dst_namespace, no_tls_reason)" outboundIdentityQuery = "count(%s%s) by (%s, dst_%s, server_id, namespace, dst_namespace, no_tls_reason)"
) )
var formatMsg = map[string]string{ var formatMsg = map[string]string{
@ -32,10 +32,43 @@ func (s *grpcServer) Edges(ctx context.Context, req *pb.EdgesRequest) (*pb.Edges
return edgesError(req, "Edges request missing Selector Resource"), nil return edgesError(req, "Edges request missing Selector Resource"), nil
} }
edges, err := s.getEdges(ctx, req) edgesHTTP, err := s.getEdges(ctx, req, "response_total")
if err != nil { if err != nil {
return edgesError(req, err.Error()), nil return edgesError(req, err.Error()), nil
} }
edgesTCP, err := s.getEdges(ctx, req, "tcp_open_total")
if err != nil {
return edgesError(req, err.Error()), nil
}
edges := []*pb.Edge{}
// iterate over tcp_open_total metrics
for _, edgeTCP := range edgesTCP {
edgeHTTPIndex := -1
// find the corresponding response_total entry
for i, edgeHTTP := range edgesHTTP {
if equalEdges(edgeTCP, edgeHTTP) {
edgeHTTPIndex = i
break
}
}
edge := edgeTCP
if edgeHTTPIndex > -1 {
// if tcp_open_total doesn't have client_id info,
// use the response_total metric instead
if edge.ClientId == "" {
edge = edgesHTTP[edgeHTTPIndex]
}
// trick to remove element quickly
edgesHTTP[edgeHTTPIndex] = edgesHTTP[len(edgesHTTP)-1]
edgesHTTP = edgesHTTP[:len(edgesHTTP)-1]
}
edges = append(edges, edge)
}
edges = append(edges, edgesHTTP...)
edges = sortEdgeRows(edges)
return &pb.EdgesResponse{ return &pb.EdgesResponse{
Response: &pb.EdgesResponse_Ok_{ Response: &pb.EdgesResponse_Ok_{
@ -57,7 +90,7 @@ func edgesError(req *pb.EdgesRequest, message string) *pb.EdgesResponse {
} }
} }
func (s *grpcServer) getEdges(ctx context.Context, req *pb.EdgesRequest) ([]*pb.Edge, error) { func (s *grpcServer) getEdges(ctx context.Context, req *pb.EdgesRequest, metric string) ([]*pb.Edge, error) {
labelNames := promGroupByLabelNames(req.Selector.Resource) labelNames := promGroupByLabelNames(req.Selector.Resource)
if len(labelNames) != 2 { if len(labelNames) != 2 {
return nil, errors.New("unexpected resource selector") return nil, errors.New("unexpected resource selector")
@ -71,8 +104,8 @@ func (s *grpcServer) getEdges(ctx context.Context, req *pb.EdgesRequest) ([]*pb.
labelsOutboundStr := generateLabelStringWithExclusion(labelsOutbound, resourceType) labelsOutboundStr := generateLabelStringWithExclusion(labelsOutbound, resourceType)
labelsInboundStr := generateLabelStringWithExclusion(labelsInbound, resourceType) labelsInboundStr := generateLabelStringWithExclusion(labelsInbound, resourceType)
outboundQuery := fmt.Sprintf(outboundIdentityQuery, labelsOutboundStr, resourceType, resourceType) outboundQuery := fmt.Sprintf(outboundIdentityQuery, metric, labelsOutboundStr, resourceType, resourceType)
inboundQuery := fmt.Sprintf(inboundIdentityQuery, labelsInboundStr, resourceType) inboundQuery := fmt.Sprintf(inboundIdentityQuery, metric, labelsInboundStr, resourceType)
inboundResult, err := s.queryProm(ctx, inboundQuery) inboundResult, err := s.queryProm(ctx, inboundQuery)
if err != nil { if err != nil {
@ -84,8 +117,8 @@ func (s *grpcServer) getEdges(ctx context.Context, req *pb.EdgesRequest) ([]*pb.
return nil, err return nil, err
} }
edge := processEdgeMetrics(inboundResult, outboundResult, resourceType, selectedNamespace) edges := processEdgeMetrics(inboundResult, outboundResult, resourceType, selectedNamespace)
return edge, nil return edges, nil
} }
func processEdgeMetrics(inbound, outbound model.Vector, resourceType, selectedNamespace string) []*pb.Edge { func processEdgeMetrics(inbound, outbound model.Vector, resourceType, selectedNamespace string) []*pb.Edge {
@ -110,6 +143,10 @@ func processEdgeMetrics(inbound, outbound model.Vector, resourceType, selectedNa
} }
for _, sample := range outbound { for _, sample := range outbound {
// e.g. tcp_open_total from older proxies don't have enough labels
if _, ok := sample.Metric[model.LabelName("dst_namespace")]; !ok {
continue
}
dstResource := sample.Metric[model.LabelName(resourceReplacementOutbound)] dstResource := sample.Metric[model.LabelName(resourceReplacementOutbound)]
srcNs := sample.Metric[model.LabelName("namespace")] srcNs := sample.Metric[model.LabelName("namespace")]
@ -190,9 +227,6 @@ func processEdgeMetrics(inbound, outbound model.Vector, resourceType, selectedNa
} }
} }
// sort rows before returning in order to have a consistent order for tests
edges = sortEdgeRows(edges)
return edges return edges
} }
@ -204,3 +238,8 @@ func sortEdgeRows(rows []*pb.Edge) []*pb.Edge {
}) })
return rows return rows
} }
func equalEdges(a, b *pb.Edge) bool {
return a.Src.Namespace == b.Src.Namespace && a.Src.Name == b.Src.Name && a.Src.Type == b.Src.Type &&
a.Dst.Namespace == b.Dst.Namespace && a.Dst.Name == b.Dst.Name && a.Dst.Type == b.Dst.Type
}