From 1db7d2a2fb467bbae3960ee2297c7e057a484416 Mon Sep 17 00:00:00 2001 From: Andrew Seigner Date: Tue, 13 Feb 2018 16:26:54 -0800 Subject: [PATCH] Ensure latency quantile queries match timestamps (#348) In PR #298 we moved time window parsing (10s => (time.now - 10s, time.now) down the stack to immediately before the query. This had the unintended effect of creating parallel latency quantile requests with slightly different timestamps. This change parses the time window prior to latency quantile fan out, ensuring all requests have the same timestamp. Signed-off-by: Andrew Seigner --- controller/api/public/grpc_server.go | 60 +++++++++++------ controller/api/public/grpc_server_test.go | 80 +++++++++++++++++++++++ 2 files changed, 119 insertions(+), 21 deletions(-) diff --git a/controller/api/public/grpc_server.go b/controller/api/public/grpc_server.go index 0f58f719a..7abb8a077 100644 --- a/controller/api/public/grpc_server.go +++ b/controller/api/public/grpc_server.go @@ -361,7 +361,12 @@ func (s *grpcServer) queryCount(ctx context.Context, req *pb.MetricRequest, rawQ return queryResult{res: telemPb.QueryResponse{}, err: err} } - return s.query(ctx, req, query) + queryReq, err := reqToQueryReq(req, query) + if err != nil { + return queryResult{res: telemPb.QueryResponse{}, err: err} + } + + return s.query(ctx, queryReq) } func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (map[pb.HistogramLabel]telemPb.QueryResponse, error) { @@ -372,15 +377,24 @@ func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (m return nil, err } + // omit query string, we'll fill it in later + queryReq, err := reqToQueryReq(req, "") + if err != nil { + return nil, err + } + results := make(chan queryResultWithLabel) // kick off requests for quantile, label := range quantileMap { go func(quantile string, label pb.HistogramLabel) { - q := fmt.Sprintf(quantileQuery, quantile, query) + // copy queryReq, gets us StartMS, EndMS, and Step + qr := queryReq + // insert our quantile-specific query + qr.Query = fmt.Sprintf(quantileQuery, quantile, query) results <- queryResultWithLabel{ - queryResult: s.query(ctx, req, q), + queryResult: s.query(ctx, qr), label: label, } }(quantile, label) @@ -401,24 +415,8 @@ func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (m return queryRsps, err } -func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query string) queryResult { - queryReq := &telemPb.QueryRequest{ - Query: query, - } - - start, end, step, err := queryParams(req) - if err != nil { - return queryResult{res: telemPb.QueryResponse{}, err: err} - } - - // EndMs always required to ensure deterministic timestamps - queryReq.EndMs = end - if !req.Summarize { - queryReq.StartMs = start - queryReq.Step = step - } - - queryRsp, err := s.telemetryClient.Query(ctx, queryReq) +func (s *grpcServer) query(ctx context.Context, queryReq telemPb.QueryRequest) queryResult { + queryRsp, err := s.telemetryClient.Query(ctx, &queryReq) if err != nil { return queryResult{res: telemPb.QueryResponse{}, err: err} } @@ -426,6 +424,26 @@ func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query str return queryResult{res: *queryRsp, err: nil} } +func reqToQueryReq(req *pb.MetricRequest, query string) (telemPb.QueryRequest, error) { + start, end, step, err := queryParams(req) + if err != nil { + return telemPb.QueryRequest{}, err + } + + // EndMs always required to ensure deterministic timestamps + queryReq := telemPb.QueryRequest{ + Query: query, + EndMs: end, + } + + if !req.Summarize { + queryReq.StartMs = start + queryReq.Step = step + } + + return queryReq, nil +} + func formatQuery(query string, req *pb.MetricRequest, sumBy string) (string, error) { sumLabels := make([]string, 0) filterLabels := make([]string, 0) diff --git a/controller/api/public/grpc_server_test.go b/controller/api/public/grpc_server_test.go index 50c22e366..7c69233e1 100644 --- a/controller/api/public/grpc_server_test.go +++ b/controller/api/public/grpc_server_test.go @@ -3,6 +3,8 @@ package public import ( "context" "reflect" + "sort" + "sync/atomic" "testing" tap "github.com/runconduit/conduit/controller/gen/controller/tap" @@ -17,10 +19,19 @@ type mockTelemetry struct { client telemetry.TelemetryClient tRes *telemetry.QueryResponse mReq *pb.MetricRequest + ts int64 } // satisfies telemetry.TelemetryClient func (m *mockTelemetry) Query(ctx context.Context, in *telemetry.QueryRequest, opts ...grpc.CallOption) (*telemetry.QueryResponse, error) { + + if !atomic.CompareAndSwapInt64(&m.ts, 0, in.EndMs) { + ts := atomic.LoadInt64(&m.ts) + if ts != in.EndMs { + m.test.Errorf("Timestamp changed across queries: %+v / %+v / %+v ", in, ts, in.EndMs) + } + } + if in.EndMs == 0 { m.test.Errorf("EndMs not set in telemetry request: %+v", in) } @@ -33,6 +44,13 @@ func (m *mockTelemetry) ListPods(ctx context.Context, in *telemetry.ListPodsRequ return nil, nil } +// sorting results makes it easier to compare against expected output +type ByHV []*pb.HistogramValue + +func (hv ByHV) Len() int { return len(hv) } +func (hv ByHV) Swap(i, j int) { hv[i], hv[j] = hv[j], hv[i] } +func (hv ByHV) Less(i, j int) bool { return hv[i].Label <= hv[j].Label } + type testResponse struct { tRes *telemetry.QueryResponse mReq *pb.MetricRequest @@ -114,6 +132,63 @@ func TestStat(t *testing.T) { }, }, }, + + testResponse{ + tRes: &telemetry.QueryResponse{ + Metrics: []*telemetry.Sample{ + &telemetry.Sample{ + Values: []*telemetry.SampleValue{ + &telemetry.SampleValue{Value: 1, TimestampMs: 2}, + }, + Labels: map[string]string{ + sourceDeployLabel: "sourceDeployLabel", + targetDeployLabel: "targetDeployLabel", + }, + }, + }, + }, + mReq: &pb.MetricRequest{ + Metrics: []pb.MetricName{ + pb.MetricName_LATENCY, + }, + Summarize: true, + Window: pb.TimeWindow_TEN_MIN, + }, + mRes: &pb.MetricResponse{ + Metrics: []*pb.MetricSeries{ + &pb.MetricSeries{ + Name: pb.MetricName_LATENCY, + Metadata: &pb.MetricMetadata{ + SourceDeploy: "sourceDeployLabel", + TargetDeploy: "targetDeployLabel", + }, + Datapoints: []*pb.MetricDatapoint{ + &pb.MetricDatapoint{ + Value: &pb.MetricValue{Value: &pb.MetricValue_Histogram{ + Histogram: &pb.Histogram{ + Values: []*pb.HistogramValue{ + &pb.HistogramValue{ + Label: pb.HistogramLabel_P50, + Value: 1, + }, + &pb.HistogramValue{ + Label: pb.HistogramLabel_P95, + Value: 1, + }, + &pb.HistogramValue{ + Label: pb.HistogramLabel_P99, + Value: 1, + }, + }, + }, + }}, + TimestampMs: 2, + }, + }, + }, + }, + }, + }, } for _, tr := range responses { @@ -124,6 +199,11 @@ func TestStat(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + switch res.Metrics[0].Name { + case pb.MetricName_LATENCY: + sort.Sort(ByHV(res.Metrics[0].Datapoints[0].Value.GetHistogram().Values)) + } + if !reflect.DeepEqual(res, tr.mRes) { t.Fatalf("Unexpected response:\n%+v\n!=\n%+v", res, tr.mRes) }