diff --git a/controller/api/public/grpc_server.go b/controller/api/public/grpc_server.go index 0f58f719a..7abb8a077 100644 --- a/controller/api/public/grpc_server.go +++ b/controller/api/public/grpc_server.go @@ -361,7 +361,12 @@ func (s *grpcServer) queryCount(ctx context.Context, req *pb.MetricRequest, rawQ return queryResult{res: telemPb.QueryResponse{}, err: err} } - return s.query(ctx, req, query) + queryReq, err := reqToQueryReq(req, query) + if err != nil { + return queryResult{res: telemPb.QueryResponse{}, err: err} + } + + return s.query(ctx, queryReq) } func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (map[pb.HistogramLabel]telemPb.QueryResponse, error) { @@ -372,15 +377,24 @@ func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (m return nil, err } + // omit query string, we'll fill it in later + queryReq, err := reqToQueryReq(req, "") + if err != nil { + return nil, err + } + results := make(chan queryResultWithLabel) // kick off requests for quantile, label := range quantileMap { go func(quantile string, label pb.HistogramLabel) { - q := fmt.Sprintf(quantileQuery, quantile, query) + // copy queryReq, gets us StartMS, EndMS, and Step + qr := queryReq + // insert our quantile-specific query + qr.Query = fmt.Sprintf(quantileQuery, quantile, query) results <- queryResultWithLabel{ - queryResult: s.query(ctx, req, q), + queryResult: s.query(ctx, qr), label: label, } }(quantile, label) @@ -401,24 +415,8 @@ func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (m return queryRsps, err } -func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query string) queryResult { - queryReq := &telemPb.QueryRequest{ - Query: query, - } - - start, end, step, err := queryParams(req) - if err != nil { - return queryResult{res: telemPb.QueryResponse{}, err: err} - } - - // EndMs always required to ensure deterministic timestamps - queryReq.EndMs = end - if !req.Summarize { - queryReq.StartMs = start - queryReq.Step = step - } - - queryRsp, err := s.telemetryClient.Query(ctx, queryReq) +func (s *grpcServer) query(ctx context.Context, queryReq telemPb.QueryRequest) queryResult { + queryRsp, err := s.telemetryClient.Query(ctx, &queryReq) if err != nil { return queryResult{res: telemPb.QueryResponse{}, err: err} } @@ -426,6 +424,26 @@ func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query str return queryResult{res: *queryRsp, err: nil} } +func reqToQueryReq(req *pb.MetricRequest, query string) (telemPb.QueryRequest, error) { + start, end, step, err := queryParams(req) + if err != nil { + return telemPb.QueryRequest{}, err + } + + // EndMs always required to ensure deterministic timestamps + queryReq := telemPb.QueryRequest{ + Query: query, + EndMs: end, + } + + if !req.Summarize { + queryReq.StartMs = start + queryReq.Step = step + } + + return queryReq, nil +} + func formatQuery(query string, req *pb.MetricRequest, sumBy string) (string, error) { sumLabels := make([]string, 0) filterLabels := make([]string, 0) diff --git a/controller/api/public/grpc_server_test.go b/controller/api/public/grpc_server_test.go index 50c22e366..7c69233e1 100644 --- a/controller/api/public/grpc_server_test.go +++ b/controller/api/public/grpc_server_test.go @@ -3,6 +3,8 @@ package public import ( "context" "reflect" + "sort" + "sync/atomic" "testing" tap "github.com/runconduit/conduit/controller/gen/controller/tap" @@ -17,10 +19,19 @@ type mockTelemetry struct { client telemetry.TelemetryClient tRes *telemetry.QueryResponse mReq *pb.MetricRequest + ts int64 } // satisfies telemetry.TelemetryClient func (m *mockTelemetry) Query(ctx context.Context, in *telemetry.QueryRequest, opts ...grpc.CallOption) (*telemetry.QueryResponse, error) { + + if !atomic.CompareAndSwapInt64(&m.ts, 0, in.EndMs) { + ts := atomic.LoadInt64(&m.ts) + if ts != in.EndMs { + m.test.Errorf("Timestamp changed across queries: %+v / %+v / %+v ", in, ts, in.EndMs) + } + } + if in.EndMs == 0 { m.test.Errorf("EndMs not set in telemetry request: %+v", in) } @@ -33,6 +44,13 @@ func (m *mockTelemetry) ListPods(ctx context.Context, in *telemetry.ListPodsRequ return nil, nil } +// sorting results makes it easier to compare against expected output +type ByHV []*pb.HistogramValue + +func (hv ByHV) Len() int { return len(hv) } +func (hv ByHV) Swap(i, j int) { hv[i], hv[j] = hv[j], hv[i] } +func (hv ByHV) Less(i, j int) bool { return hv[i].Label <= hv[j].Label } + type testResponse struct { tRes *telemetry.QueryResponse mReq *pb.MetricRequest @@ -114,6 +132,63 @@ func TestStat(t *testing.T) { }, }, }, + + testResponse{ + tRes: &telemetry.QueryResponse{ + Metrics: []*telemetry.Sample{ + &telemetry.Sample{ + Values: []*telemetry.SampleValue{ + &telemetry.SampleValue{Value: 1, TimestampMs: 2}, + }, + Labels: map[string]string{ + sourceDeployLabel: "sourceDeployLabel", + targetDeployLabel: "targetDeployLabel", + }, + }, + }, + }, + mReq: &pb.MetricRequest{ + Metrics: []pb.MetricName{ + pb.MetricName_LATENCY, + }, + Summarize: true, + Window: pb.TimeWindow_TEN_MIN, + }, + mRes: &pb.MetricResponse{ + Metrics: []*pb.MetricSeries{ + &pb.MetricSeries{ + Name: pb.MetricName_LATENCY, + Metadata: &pb.MetricMetadata{ + SourceDeploy: "sourceDeployLabel", + TargetDeploy: "targetDeployLabel", + }, + Datapoints: []*pb.MetricDatapoint{ + &pb.MetricDatapoint{ + Value: &pb.MetricValue{Value: &pb.MetricValue_Histogram{ + Histogram: &pb.Histogram{ + Values: []*pb.HistogramValue{ + &pb.HistogramValue{ + Label: pb.HistogramLabel_P50, + Value: 1, + }, + &pb.HistogramValue{ + Label: pb.HistogramLabel_P95, + Value: 1, + }, + &pb.HistogramValue{ + Label: pb.HistogramLabel_P99, + Value: 1, + }, + }, + }, + }}, + TimestampMs: 2, + }, + }, + }, + }, + }, + }, } for _, tr := range responses { @@ -124,6 +199,11 @@ func TestStat(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + switch res.Metrics[0].Name { + case pb.MetricName_LATENCY: + sort.Sort(ByHV(res.Metrics[0].Datapoints[0].Value.GetHistogram().Values)) + } + if !reflect.DeepEqual(res, tr.mRes) { t.Fatalf("Unexpected response:\n%+v\n!=\n%+v", res, tr.mRes) }