Adding edges endpoint to public API (#2793)

This change adds an endpoint to the public API to allow us to query Prometheus for edge data, in order to display identity information for connections between Linkerd proxies. This PR only includes changes to the controller and protobuf.
This commit is contained in:
Carol A. Scott 2019-05-09 09:30:11 -07:00 committed by GitHub
parent 2ae0daca9f
commit 87e69bf885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 784 additions and 236 deletions

View File

@ -51,6 +51,12 @@ func (c *grpcOverHTTPClient) StatSummary(ctx context.Context, req *pb.StatSummar
return &msg, err
}
func (c *grpcOverHTTPClient) Edges(ctx context.Context, req *pb.EdgesRequest, _ ...grpc.CallOption) (*pb.EdgesResponse, error) {
var msg pb.EdgesResponse
err := c.apiRequest(ctx, "Edges", req, &msg)
return &msg, err
}
func (c *grpcOverHTTPClient) TopRoutes(ctx context.Context, req *pb.TopRoutesRequest, _ ...grpc.CallOption) (*pb.TopRoutesResponse, error) {
var msg pb.TopRoutesResponse
err := c.apiRequest(ctx, "TopRoutes", req, &msg)

View File

@ -0,0 +1,144 @@
package public
import (
"context"
"errors"
"fmt"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/prometheus/common/model"
log "github.com/sirupsen/logrus"
)
const (
inboundIdentityQuery = "count(response_total%s) by (%s, client_id)"
outboundIdentityQuery = "count(response_total%s) by (%s, dst_%s, server_id, no_tls_reason)"
)
var formatMsg = map[string]string{
"disabled": "Disabled",
"loopback": "Loopback",
"no_authority_in_http_request": "No Authority In HTTP Request",
"not_http": "Not HTTP",
"not_provided_by_remote": "Not Provided By Remote",
"not_provided_by_service_discovery": "Not Provided By Service Discovery",
}
func (s *grpcServer) Edges(ctx context.Context, req *pb.EdgesRequest) (*pb.EdgesResponse, error) {
log.Debugf("Edges request: %+v", req)
if req.GetSelector().GetResource() == nil {
return edgesError(req, "Edges request missing Selector Resource"), nil
}
edges, err := s.getEdges(ctx, req)
if err != nil {
return edgesError(req, err.Error()), nil
}
return &pb.EdgesResponse{
Response: &pb.EdgesResponse_Ok_{
Ok: &pb.EdgesResponse_Ok{
Edges: edges,
},
},
}, nil
}
func edgesError(req *pb.EdgesRequest, message string) *pb.EdgesResponse {
return &pb.EdgesResponse{
Response: &pb.EdgesResponse_Error{
Error: &pb.ResourceError{
Resource: req.GetSelector().GetResource(),
Error: message,
},
},
}
}
func (s *grpcServer) getEdges(ctx context.Context, req *pb.EdgesRequest) ([]*pb.Edge, error) {
labelNames := promGroupByLabelNames(req.Selector.Resource)
if len(labelNames) != 2 {
return nil, errors.New("unexpected resource selector")
}
resourceType := string(labelNames[1]) // skipping first name which is always namespace
labels := promQueryLabels(req.Selector.Resource)
labelsOutbound := labels.Merge(promDirectionLabels("outbound"))
labelsInbound := labels.Merge(promDirectionLabels("inbound"))
inboundQuery := fmt.Sprintf(inboundIdentityQuery, labelsInbound, resourceType)
outboundQuery := fmt.Sprintf(outboundIdentityQuery, labelsOutbound, resourceType, resourceType)
inboundResult, err := s.queryProm(ctx, inboundQuery)
if err != nil {
return nil, err
}
outboundResult, err := s.queryProm(ctx, outboundQuery)
if err != nil {
return nil, err
}
edge := processEdgeMetrics(inboundResult, outboundResult, resourceType)
return edge, nil
}
func processEdgeMetrics(inbound, outbound model.Vector, resourceType string) []*pb.Edge {
edges := []*pb.Edge{}
dstIndex := map[model.LabelValue]model.Metric{}
srcIndex := map[model.LabelValue][]model.Metric{}
resourceReplacementInbound := resourceType
resourceReplacementOutbound := "dst_" + resourceType
for _, sample := range inbound {
// skip any inbound results that do not have a client_id, because this means
// the communication was one-sided (i.e. a probe or another instance where
// the src/dst are not both known) in future the edges command will support
// one-sided edges
if _, ok := sample.Metric[model.LabelName("client_id")]; ok {
key := sample.Metric[model.LabelName(resourceReplacementInbound)]
dstIndex[key] = sample.Metric
}
}
for _, sample := range outbound {
// skip any outbound results that do not have a server_id for same reason as
// above section
if _, ok := sample.Metric[model.LabelName("server_id")]; ok {
key := sample.Metric[model.LabelName(resourceReplacementOutbound)]
if _, ok := srcIndex[key]; !ok {
srcIndex[key] = []model.Metric{}
}
srcIndex[key] = append(srcIndex[key], sample.Metric)
}
}
for key, sources := range srcIndex {
for _, src := range sources {
dst, ok := dstIndex[key]
if !ok {
log.Errorf("missing resource in destination metrics: %s", key)
continue
}
msg := ""
if val, ok := src[model.LabelName("no_tls_reason")]; ok {
msg = formatMsg[string(val)]
}
edge := &pb.Edge{
Src: &pb.Resource{
Name: string(src[model.LabelName(resourceType)]),
Type: resourceType,
},
Dst: &pb.Resource{
Name: string(dst[model.LabelName(resourceType)]),
Type: resourceType,
},
ClientId: string(dst[model.LabelName("client_id")]),
ServerId: string(src[model.LabelName("server_id")]),
NoIdentityMsg: msg,
}
edges = append(edges, edge)
}
}
return edges
}

View File

@ -26,6 +26,7 @@ var (
tapByResourcePath = fullURLPathFor("TapByResource")
selfCheckPath = fullURLPathFor("SelfCheck")
endpointsPath = fullURLPathFor("Endpoints")
edgesPath = fullURLPathFor("Edges")
configPath = fullURLPathFor("Config")
)
@ -61,6 +62,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.handleSelfCheck(w, req)
case endpointsPath:
h.handleEndpoints(w, req)
case edgesPath:
h.handleEdges(w, req)
case configPath:
h.handleConfig(w, req)
default:
@ -90,6 +93,27 @@ func (h *handler) handleStatSummary(w http.ResponseWriter, req *http.Request) {
}
}
func (h *handler) handleEdges(w http.ResponseWriter, req *http.Request) {
var protoRequest pb.EdgesRequest
err := httpRequestToProto(req, &protoRequest)
if err != nil {
writeErrorToHTTPResponse(w, err)
return
}
rsp, err := h.grpcServer.Edges(req.Context(), &protoRequest)
if err != nil {
writeErrorToHTTPResponse(w, err)
return
}
err = writeProtoToHTTPResponse(w, rsp)
if err != nil {
writeErrorToHTTPResponse(w, err)
return
}
}
func (h *handler) handleTopRoutes(w http.ResponseWriter, req *http.Request) {
var protoRequest pb.TopRoutesRequest

View File

@ -36,6 +36,11 @@ func (m *mockGrpcServer) TopRoutes(ctx context.Context, req *pb.TopRoutesRequest
return m.ResponseToReturn.(*pb.TopRoutesResponse), m.ErrorToReturn
}
func (m *mockGrpcServer) Edges(ctx context.Context, req *pb.EdgesRequest) (*pb.EdgesResponse, error) {
m.LastRequestReceived = req
return m.ResponseToReturn.(*pb.EdgesResponse), m.ErrorToReturn
}
func (m *mockGrpcServer) Version(ctx context.Context, req *pb.Empty) (*pb.VersionInfo, error) {
m.LastRequestReceived = req
return m.ResponseToReturn.(*pb.VersionInfo), m.ErrorToReturn

View File

@ -27,6 +27,7 @@ type MockAPIClient struct {
ListServicesResponseToReturn *pb.ListServicesResponse
StatSummaryResponseToReturn *pb.StatSummaryResponse
TopRoutesResponseToReturn *pb.TopRoutesResponse
EdgesResponseToReturn *pb.EdgesResponse
SelfCheckResponseToReturn *healthcheckPb.SelfCheckResponse
ConfigResponseToReturn *configPb.All
APITapClientToReturn pb.Api_TapClient
@ -44,6 +45,11 @@ func (c *MockAPIClient) TopRoutes(ctx context.Context, in *pb.TopRoutesRequest,
return c.TopRoutesResponseToReturn, c.ErrorToReturn
}
// Edges provides a mock of a Public API method.
func (c *MockAPIClient) Edges(ctx context.Context, in *pb.EdgesRequest, opts ...grpc.CallOption) (*pb.EdgesResponse, error) {
return c.EdgesResponseToReturn, c.ErrorToReturn
}
// Version provides a mock of a Public API method.
func (c *MockAPIClient) Version(ctx context.Context, in *pb.Empty, opts ...grpc.CallOption) (*pb.VersionInfo, error) {
return c.VersionInfoToReturn, c.ErrorToReturn

File diff suppressed because it is too large Load Diff

View File

@ -378,6 +378,29 @@ message StatTable {
}
}
message EdgesRequest {
ResourceSelection selector = 1;
}
message EdgesResponse {
oneof response {
Ok ok = 1;
ResourceError error = 2;
}
message Ok {
repeated Edge edges = 1;
}
}
message Edge {
Resource src = 1;
Resource dst = 2;
string client_id = 3;
string server_id = 4;
string no_identity_msg = 5;
}
message TopRoutesRequest {
ResourceSelection selector = 1;
string time_window = 2;
@ -415,6 +438,8 @@ message RouteTable {
service Api {
rpc StatSummary(StatSummaryRequest) returns (StatSummaryResponse) {}
rpc Edges(EdgesRequest) returns (EdgesResponse) {}
rpc TopRoutes(TopRoutesRequest) returns (TopRoutesResponse) {}
rpc ListPods(ListPodsRequest) returns (ListPodsResponse) {}