mirror of https://github.com/linkerd/linkerd2.git
Concurrent Telemetry requests (#323)
All requests from the public API service to the Telemetry service were done serially. In some cases a single request to the public API's Stat endpoint resulted in 5 serial requests to the Telemetry service. Make all requests from the Public API to Telemetry concurrent. Signed-off-by: Andrew Seigner <siggy@buoyant.io> Part of #299
This commit is contained in:
parent
458e9d2ac5
commit
bffa5ff3e6
|
@ -29,6 +29,20 @@ type (
|
|||
failure float64
|
||||
}
|
||||
|
||||
// these structs couple responses with an error, useful when returning results via channels
|
||||
metricResult struct {
|
||||
series []pb.MetricSeries
|
||||
err error
|
||||
}
|
||||
queryResult struct {
|
||||
res telemPb.QueryResponse
|
||||
err error
|
||||
}
|
||||
queryResultWithLabel struct {
|
||||
label pb.HistogramLabel
|
||||
queryResult
|
||||
}
|
||||
|
||||
// sortable slice of unix ms timestamps
|
||||
timestamps []int64
|
||||
)
|
||||
|
@ -77,42 +91,62 @@ func newGrpcServer(telemetryClient telemPb.TelemetryClient, tapClient tapPb.TapC
|
|||
}
|
||||
|
||||
func (s *grpcServer) Stat(ctx context.Context, req *pb.MetricRequest) (*pb.MetricResponse, error) {
|
||||
var err error
|
||||
resultsCh := make(chan metricResult)
|
||||
metrics := make([]*pb.MetricSeries, 0)
|
||||
|
||||
// kick off requests
|
||||
for _, metric := range req.Metrics {
|
||||
var err error
|
||||
var series []*pb.MetricSeries
|
||||
|
||||
switch metric {
|
||||
case pb.MetricName_REQUEST_RATE:
|
||||
if req.GroupBy == pb.AggregationType_MESH {
|
||||
series, err = s.requestRateMesh(ctx, req)
|
||||
} else {
|
||||
series, err = s.requestRate(ctx, req)
|
||||
}
|
||||
case pb.MetricName_SUCCESS_RATE:
|
||||
if req.GroupBy == pb.AggregationType_MESH {
|
||||
series, err = s.successRateMesh(ctx, req)
|
||||
} else {
|
||||
series, err = s.successRate(ctx, req)
|
||||
}
|
||||
case pb.MetricName_LATENCY:
|
||||
if req.GroupBy == pb.AggregationType_MESH {
|
||||
return nil, fmt.Errorf("latency not supported for MESH queries")
|
||||
} else {
|
||||
series, err = s.latency(ctx, req)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported metric: %s", metric)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
metrics = append(metrics, series...)
|
||||
go func(metric pb.MetricName) { resultsCh <- s.queryMetric(ctx, req, metric) }(metric)
|
||||
}
|
||||
|
||||
return &pb.MetricResponse{Metrics: metrics}, nil
|
||||
// process results
|
||||
for _ = range req.Metrics {
|
||||
result := <-resultsCh
|
||||
if result.err != nil {
|
||||
log.Errorf("Stat -> queryMetric failed with: %s", err)
|
||||
err = result.err
|
||||
} else {
|
||||
for _, ser := range result.series {
|
||||
metrics = append(metrics, &ser)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if an error occurred, return the error, along with partial results
|
||||
return &pb.MetricResponse{Metrics: metrics}, err
|
||||
}
|
||||
|
||||
func (s *grpcServer) queryMetric(ctx context.Context, req *pb.MetricRequest, metric pb.MetricName) metricResult {
|
||||
|
||||
result := metricResult{}
|
||||
|
||||
switch metric {
|
||||
case pb.MetricName_REQUEST_RATE:
|
||||
if req.GroupBy == pb.AggregationType_MESH {
|
||||
result.series, result.err = s.requestRateMesh(ctx, req)
|
||||
} else {
|
||||
result.series, result.err = s.requestRate(ctx, req)
|
||||
}
|
||||
case pb.MetricName_SUCCESS_RATE:
|
||||
if req.GroupBy == pb.AggregationType_MESH {
|
||||
result.series, result.err = s.successRateMesh(ctx, req)
|
||||
} else {
|
||||
result.series, result.err = s.successRate(ctx, req)
|
||||
}
|
||||
case pb.MetricName_LATENCY:
|
||||
if req.GroupBy == pb.AggregationType_MESH {
|
||||
result.series = nil
|
||||
result.err = fmt.Errorf("latency not supported for MESH queries")
|
||||
} else {
|
||||
result.series, result.err = s.latency(ctx, req)
|
||||
}
|
||||
default:
|
||||
result.series = nil
|
||||
result.err = fmt.Errorf("unsupported metric: %s", metric)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (_ *grpcServer) Version(ctx context.Context, req *pb.Empty) (*pb.VersionInfo, error) {
|
||||
|
@ -173,58 +207,84 @@ func (s *grpcServer) Tap(req *pb.TapRequest, stream pb.Api_TapServer) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *grpcServer) requestRate(ctx context.Context, req *pb.MetricRequest) ([]*pb.MetricSeries, error) {
|
||||
queryRsp, err := s.queryCount(ctx, req, countQuery, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (s *grpcServer) requestRate(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) {
|
||||
result := s.queryCount(ctx, req, countQuery, "")
|
||||
if result.err != nil {
|
||||
return nil, result.err
|
||||
}
|
||||
|
||||
return processRequestRate(queryRsp.Metrics, extractMetadata)
|
||||
return processRequestRate(result.res.Metrics, extractMetadata)
|
||||
}
|
||||
|
||||
func (s *grpcServer) requestRateMesh(ctx context.Context, req *pb.MetricRequest) ([]*pb.MetricSeries, error) {
|
||||
httpQueryRsp, err := s.queryCount(ctx, req, countHttpQuery, "")
|
||||
func (s *grpcServer) requestRateMesh(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) {
|
||||
var err error
|
||||
resultsCh := make(chan queryResult)
|
||||
metrics := make([]*telemPb.Sample, 0)
|
||||
|
||||
// kick off requests
|
||||
go func() { resultsCh <- s.queryCount(ctx, req, countHttpQuery, "") }()
|
||||
go func() { resultsCh <- s.queryCount(ctx, req, countGrpcQuery, "") }()
|
||||
|
||||
// process results, loop twice, for countHttpQuery and countGrpcQuery
|
||||
for i := 0; i < 2; i++ {
|
||||
result := <-resultsCh
|
||||
if result.err != nil {
|
||||
log.Errorf("requestRateMesh -> queryCount failed with: %s", err)
|
||||
err = result.err
|
||||
} else {
|
||||
metrics = append(metrics, result.res.Metrics...)
|
||||
}
|
||||
}
|
||||
|
||||
// if any errors occurred, return no results
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
grpcQueryRsp, err := s.queryCount(ctx, req, countGrpcQuery, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metrics := append(httpQueryRsp.Metrics, grpcQueryRsp.Metrics...)
|
||||
return processRequestRate(metrics, extractMetadataMesh)
|
||||
}
|
||||
|
||||
func (s *grpcServer) successRate(ctx context.Context, req *pb.MetricRequest) ([]*pb.MetricSeries, error) {
|
||||
queryRsp, err := s.queryCount(ctx, req, countQuery, "classification")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (s *grpcServer) successRate(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) {
|
||||
result := s.queryCount(ctx, req, countQuery, "classification")
|
||||
if result.err != nil {
|
||||
return nil, result.err
|
||||
}
|
||||
|
||||
return processSuccessRate(queryRsp.Metrics, extractMetadata, isSuccess)
|
||||
return processSuccessRate(result.res.Metrics, extractMetadata, isSuccess)
|
||||
}
|
||||
|
||||
func (s *grpcServer) successRateMesh(ctx context.Context, req *pb.MetricRequest) ([]*pb.MetricSeries, error) {
|
||||
httpQueryRsp, err := s.queryCount(ctx, req, countHttpQuery, "code")
|
||||
func (s *grpcServer) successRateMesh(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) {
|
||||
var err error
|
||||
resultsCh := make(chan queryResult)
|
||||
metrics := make([]*telemPb.Sample, 0)
|
||||
|
||||
// kick off requests
|
||||
go func() { resultsCh <- s.queryCount(ctx, req, countHttpQuery, "code") }()
|
||||
go func() { resultsCh <- s.queryCount(ctx, req, countGrpcQuery, "grpc_code") }()
|
||||
|
||||
// process results, loop twice, for countHttpQuery and countGrpcQuery
|
||||
for i := 0; i < 2; i++ {
|
||||
result := <-resultsCh
|
||||
if result.err != nil {
|
||||
log.Errorf("successRateMesh -> queryCount failed with: %s", err)
|
||||
err = result.err
|
||||
} else {
|
||||
metrics = append(metrics, result.res.Metrics...)
|
||||
}
|
||||
}
|
||||
|
||||
// if any errors occurred, return no results
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
grpcQueryRsp, err := s.queryCount(ctx, req, countGrpcQuery, "grpc_code")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metrics := append(httpQueryRsp.Metrics, grpcQueryRsp.Metrics...)
|
||||
return processSuccessRate(metrics, extractMetadataMesh, isSuccessMesh)
|
||||
}
|
||||
|
||||
func (s *grpcServer) latency(ctx context.Context, req *pb.MetricRequest) ([]*pb.MetricSeries, error) {
|
||||
func (s *grpcServer) latency(ctx context.Context, req *pb.MetricRequest) ([]pb.MetricSeries, error) {
|
||||
timestamps := make(map[int64]struct{})
|
||||
latencies := make(map[pb.MetricMetadata]map[int64][]*pb.HistogramValue)
|
||||
series := make([]*pb.MetricSeries, 0)
|
||||
series := make([]pb.MetricSeries, 0)
|
||||
|
||||
queryRsps, err := s.queryLatency(ctx, req)
|
||||
if err != nil {
|
||||
|
@ -284,7 +344,7 @@ func (s *grpcServer) latency(ctx context.Context, req *pb.MetricRequest) ([]*pb.
|
|||
}
|
||||
}
|
||||
|
||||
s := &pb.MetricSeries{
|
||||
s := pb.MetricSeries{
|
||||
Name: pb.MetricName_LATENCY,
|
||||
Metadata: &m,
|
||||
Datapoints: datapoints,
|
||||
|
@ -295,44 +355,53 @@ func (s *grpcServer) latency(ctx context.Context, req *pb.MetricRequest) ([]*pb.
|
|||
return series, nil
|
||||
}
|
||||
|
||||
func (s *grpcServer) queryCount(ctx context.Context, req *pb.MetricRequest, rawQuery, sumBy string) (*telemPb.QueryResponse, error) {
|
||||
func (s *grpcServer) queryCount(ctx context.Context, req *pb.MetricRequest, rawQuery, sumBy string) queryResult {
|
||||
query, err := formatQuery(rawQuery, req, sumBy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return queryResult{res: telemPb.QueryResponse{}, err: err}
|
||||
}
|
||||
|
||||
queryRsp, err := s.query(ctx, req, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return queryRsp, nil
|
||||
return s.query(ctx, req, query)
|
||||
}
|
||||
|
||||
// TODO: make these requests in parallel
|
||||
func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (map[pb.HistogramLabel]*telemPb.QueryResponse, error) {
|
||||
queryRsps := make(map[pb.HistogramLabel]*telemPb.QueryResponse)
|
||||
func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (map[pb.HistogramLabel]telemPb.QueryResponse, error) {
|
||||
queryRsps := make(map[pb.HistogramLabel]telemPb.QueryResponse)
|
||||
|
||||
query, err := formatQuery(latencyQuery, req, "le")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
results := make(chan queryResultWithLabel)
|
||||
|
||||
// kick off requests
|
||||
for quantile, label := range quantileMap {
|
||||
q := fmt.Sprintf(quantileQuery, quantile, query)
|
||||
go func(quantile string, label pb.HistogramLabel) {
|
||||
q := fmt.Sprintf(quantileQuery, quantile, query)
|
||||
|
||||
queryRsp, err := s.query(ctx, req, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queryRsps[label] = queryRsp
|
||||
results <- queryResultWithLabel{
|
||||
queryResult: s.query(ctx, req, q),
|
||||
label: label,
|
||||
}
|
||||
}(quantile, label)
|
||||
}
|
||||
|
||||
return queryRsps, nil
|
||||
// process results
|
||||
for _ = range quantileMap {
|
||||
result := <-results
|
||||
if result.err != nil {
|
||||
log.Errorf("queryLatency -> query failed with: %s", err)
|
||||
err = result.err
|
||||
} else {
|
||||
queryRsps[result.label] = result.res
|
||||
}
|
||||
}
|
||||
|
||||
// if an error occurred, return the error, along with partial results
|
||||
return queryRsps, err
|
||||
}
|
||||
|
||||
func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query string) (*telemPb.QueryResponse, error) {
|
||||
func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query string) queryResult {
|
||||
queryReq := &telemPb.QueryRequest{
|
||||
Query: query,
|
||||
}
|
||||
|
@ -340,7 +409,7 @@ func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query str
|
|||
if !req.Summarize {
|
||||
start, end, step, err := queryParams(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return queryResult{res: telemPb.QueryResponse{}, err: err}
|
||||
}
|
||||
|
||||
queryReq.StartMs = start
|
||||
|
@ -350,10 +419,10 @@ func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query str
|
|||
|
||||
queryRsp, err := s.telemetryClient.Query(ctx, queryReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return queryResult{res: telemPb.QueryResponse{}, err: err}
|
||||
}
|
||||
|
||||
return queryRsp, nil
|
||||
return queryResult{res: *queryRsp, err: nil}
|
||||
}
|
||||
|
||||
func formatQuery(query string, req *pb.MetricRequest, sumBy string) (string, error) {
|
||||
|
@ -447,8 +516,8 @@ func isSuccessMesh(labels map[string]string) (success bool) {
|
|||
func processRequestRate(
|
||||
metrics []*telemPb.Sample,
|
||||
metadataFn func(*telemPb.Sample) pb.MetricMetadata,
|
||||
) ([]*pb.MetricSeries, error) {
|
||||
series := make([]*pb.MetricSeries, 0)
|
||||
) ([]pb.MetricSeries, error) {
|
||||
series := make([]pb.MetricSeries, 0)
|
||||
|
||||
for _, metric := range metrics {
|
||||
if len(metric.Values) == 0 {
|
||||
|
@ -475,7 +544,7 @@ func processRequestRate(
|
|||
continue
|
||||
}
|
||||
|
||||
s := &pb.MetricSeries{
|
||||
s := pb.MetricSeries{
|
||||
Name: pb.MetricName_REQUEST_RATE,
|
||||
Metadata: &metadata,
|
||||
Datapoints: datapoints,
|
||||
|
@ -490,10 +559,10 @@ func processSuccessRate(
|
|||
metrics []*telemPb.Sample,
|
||||
metadataFn func(*telemPb.Sample) pb.MetricMetadata,
|
||||
successRateFn func(map[string]string) bool,
|
||||
) ([]*pb.MetricSeries, error) {
|
||||
) ([]pb.MetricSeries, error) {
|
||||
timestamps := make(map[int64]struct{})
|
||||
successRates := make(map[pb.MetricMetadata]map[int64]*successRate)
|
||||
series := make([]*pb.MetricSeries, 0)
|
||||
series := make([]pb.MetricSeries, 0)
|
||||
|
||||
for _, metric := range metrics {
|
||||
if len(metric.Values) == 0 {
|
||||
|
@ -545,7 +614,7 @@ func processSuccessRate(
|
|||
}
|
||||
}
|
||||
|
||||
s := &pb.MetricSeries{
|
||||
s := pb.MetricSeries{
|
||||
Name: pb.MetricName_SUCCESS_RATE,
|
||||
Metadata: &m,
|
||||
Datapoints: datapoints,
|
||||
|
|
Loading…
Reference in New Issue