Ensure latency quantile queries match timestamps (#348)

In PR #298 we moved time window parsing (10s => (time.now - 10s,
time.now) down the stack to immediately before the query. This had the
unintended effect of creating parallel latency quantile requests with
slightly different timestamps.

This change parses the time window prior to latency quantile fan out,
ensuring all requests have the same timestamp.

Signed-off-by: Andrew Seigner <siggy@buoyant.io>
This commit is contained in:
Andrew Seigner 2018-02-13 16:26:54 -08:00 committed by GitHub
parent aa123b8ad5
commit 1db7d2a2fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 119 additions and 21 deletions

View File

@ -361,7 +361,12 @@ func (s *grpcServer) queryCount(ctx context.Context, req *pb.MetricRequest, rawQ
return queryResult{res: telemPb.QueryResponse{}, err: err} return queryResult{res: telemPb.QueryResponse{}, err: err}
} }
return s.query(ctx, req, query) queryReq, err := reqToQueryReq(req, query)
if err != nil {
return queryResult{res: telemPb.QueryResponse{}, err: err}
}
return s.query(ctx, queryReq)
} }
func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (map[pb.HistogramLabel]telemPb.QueryResponse, error) { func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (map[pb.HistogramLabel]telemPb.QueryResponse, error) {
@ -372,15 +377,24 @@ func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (m
return nil, err return nil, err
} }
// omit query string, we'll fill it in later
queryReq, err := reqToQueryReq(req, "")
if err != nil {
return nil, err
}
results := make(chan queryResultWithLabel) results := make(chan queryResultWithLabel)
// kick off requests // kick off requests
for quantile, label := range quantileMap { for quantile, label := range quantileMap {
go func(quantile string, label pb.HistogramLabel) { go func(quantile string, label pb.HistogramLabel) {
q := fmt.Sprintf(quantileQuery, quantile, query) // copy queryReq, gets us StartMS, EndMS, and Step
qr := queryReq
// insert our quantile-specific query
qr.Query = fmt.Sprintf(quantileQuery, quantile, query)
results <- queryResultWithLabel{ results <- queryResultWithLabel{
queryResult: s.query(ctx, req, q), queryResult: s.query(ctx, qr),
label: label, label: label,
} }
}(quantile, label) }(quantile, label)
@ -401,24 +415,8 @@ func (s *grpcServer) queryLatency(ctx context.Context, req *pb.MetricRequest) (m
return queryRsps, err return queryRsps, err
} }
func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query string) queryResult { func (s *grpcServer) query(ctx context.Context, queryReq telemPb.QueryRequest) queryResult {
queryReq := &telemPb.QueryRequest{ queryRsp, err := s.telemetryClient.Query(ctx, &queryReq)
Query: query,
}
start, end, step, err := queryParams(req)
if err != nil {
return queryResult{res: telemPb.QueryResponse{}, err: err}
}
// EndMs always required to ensure deterministic timestamps
queryReq.EndMs = end
if !req.Summarize {
queryReq.StartMs = start
queryReq.Step = step
}
queryRsp, err := s.telemetryClient.Query(ctx, queryReq)
if err != nil { if err != nil {
return queryResult{res: telemPb.QueryResponse{}, err: err} return queryResult{res: telemPb.QueryResponse{}, err: err}
} }
@ -426,6 +424,26 @@ func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query str
return queryResult{res: *queryRsp, err: nil} return queryResult{res: *queryRsp, err: nil}
} }
func reqToQueryReq(req *pb.MetricRequest, query string) (telemPb.QueryRequest, error) {
start, end, step, err := queryParams(req)
if err != nil {
return telemPb.QueryRequest{}, err
}
// EndMs always required to ensure deterministic timestamps
queryReq := telemPb.QueryRequest{
Query: query,
EndMs: end,
}
if !req.Summarize {
queryReq.StartMs = start
queryReq.Step = step
}
return queryReq, nil
}
func formatQuery(query string, req *pb.MetricRequest, sumBy string) (string, error) { func formatQuery(query string, req *pb.MetricRequest, sumBy string) (string, error) {
sumLabels := make([]string, 0) sumLabels := make([]string, 0)
filterLabels := make([]string, 0) filterLabels := make([]string, 0)

View File

@ -3,6 +3,8 @@ package public
import ( import (
"context" "context"
"reflect" "reflect"
"sort"
"sync/atomic"
"testing" "testing"
tap "github.com/runconduit/conduit/controller/gen/controller/tap" tap "github.com/runconduit/conduit/controller/gen/controller/tap"
@ -17,10 +19,19 @@ type mockTelemetry struct {
client telemetry.TelemetryClient client telemetry.TelemetryClient
tRes *telemetry.QueryResponse tRes *telemetry.QueryResponse
mReq *pb.MetricRequest mReq *pb.MetricRequest
ts int64
} }
// satisfies telemetry.TelemetryClient // satisfies telemetry.TelemetryClient
func (m *mockTelemetry) Query(ctx context.Context, in *telemetry.QueryRequest, opts ...grpc.CallOption) (*telemetry.QueryResponse, error) { func (m *mockTelemetry) Query(ctx context.Context, in *telemetry.QueryRequest, opts ...grpc.CallOption) (*telemetry.QueryResponse, error) {
if !atomic.CompareAndSwapInt64(&m.ts, 0, in.EndMs) {
ts := atomic.LoadInt64(&m.ts)
if ts != in.EndMs {
m.test.Errorf("Timestamp changed across queries: %+v / %+v / %+v ", in, ts, in.EndMs)
}
}
if in.EndMs == 0 { if in.EndMs == 0 {
m.test.Errorf("EndMs not set in telemetry request: %+v", in) m.test.Errorf("EndMs not set in telemetry request: %+v", in)
} }
@ -33,6 +44,13 @@ func (m *mockTelemetry) ListPods(ctx context.Context, in *telemetry.ListPodsRequ
return nil, nil return nil, nil
} }
// sorting results makes it easier to compare against expected output
type ByHV []*pb.HistogramValue
func (hv ByHV) Len() int { return len(hv) }
func (hv ByHV) Swap(i, j int) { hv[i], hv[j] = hv[j], hv[i] }
func (hv ByHV) Less(i, j int) bool { return hv[i].Label <= hv[j].Label }
type testResponse struct { type testResponse struct {
tRes *telemetry.QueryResponse tRes *telemetry.QueryResponse
mReq *pb.MetricRequest mReq *pb.MetricRequest
@ -114,6 +132,63 @@ func TestStat(t *testing.T) {
}, },
}, },
}, },
testResponse{
tRes: &telemetry.QueryResponse{
Metrics: []*telemetry.Sample{
&telemetry.Sample{
Values: []*telemetry.SampleValue{
&telemetry.SampleValue{Value: 1, TimestampMs: 2},
},
Labels: map[string]string{
sourceDeployLabel: "sourceDeployLabel",
targetDeployLabel: "targetDeployLabel",
},
},
},
},
mReq: &pb.MetricRequest{
Metrics: []pb.MetricName{
pb.MetricName_LATENCY,
},
Summarize: true,
Window: pb.TimeWindow_TEN_MIN,
},
mRes: &pb.MetricResponse{
Metrics: []*pb.MetricSeries{
&pb.MetricSeries{
Name: pb.MetricName_LATENCY,
Metadata: &pb.MetricMetadata{
SourceDeploy: "sourceDeployLabel",
TargetDeploy: "targetDeployLabel",
},
Datapoints: []*pb.MetricDatapoint{
&pb.MetricDatapoint{
Value: &pb.MetricValue{Value: &pb.MetricValue_Histogram{
Histogram: &pb.Histogram{
Values: []*pb.HistogramValue{
&pb.HistogramValue{
Label: pb.HistogramLabel_P50,
Value: 1,
},
&pb.HistogramValue{
Label: pb.HistogramLabel_P95,
Value: 1,
},
&pb.HistogramValue{
Label: pb.HistogramLabel_P99,
Value: 1,
},
},
},
}},
TimestampMs: 2,
},
},
},
},
},
},
} }
for _, tr := range responses { for _, tr := range responses {
@ -124,6 +199,11 @@ func TestStat(t *testing.T) {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
switch res.Metrics[0].Name {
case pb.MetricName_LATENCY:
sort.Sort(ByHV(res.Metrics[0].Datapoints[0].Value.GetHistogram().Values))
}
if !reflect.DeepEqual(res, tr.mRes) { if !reflect.DeepEqual(res, tr.mRes) {
t.Fatalf("Unexpected response:\n%+v\n!=\n%+v", res, tr.mRes) t.Fatalf("Unexpected response:\n%+v\n!=\n%+v", res, tr.mRes)
} }