mirror of https://github.com/linkerd/linkerd2.git
Require timestamp on all telemetry requests (#342)
PR #298 moved summary (non-timeseries) requests to Prometheus' Query endpoint, with no timestamp provided. This Query endpoint returns a single data point with whatever timestamp was provided in the request. In the absense of a timestamp, it uses current server time. This causes the Public API to return discreet data points with slightly different timestamps, which is unexpected behavior. Modify the Public API -> Telemetry -> Prometheus request path to always require a timestamp for single data point requests. Fixes #340 Signed-off-by: Andrew Seigner <siggy@buoyant.io>
This commit is contained in:
parent
4154db2d4f
commit
50f4aa57e5
|
@ -104,7 +104,7 @@ func (s *grpcServer) Stat(ctx context.Context, req *pb.MetricRequest) (*pb.Metri
|
||||||
for _ = range req.Metrics {
|
for _ = range req.Metrics {
|
||||||
result := <-resultsCh
|
result := <-resultsCh
|
||||||
if result.err != nil {
|
if result.err != nil {
|
||||||
log.Errorf("Stat -> queryMetric failed with: %s", err)
|
log.Errorf("Stat -> queryMetric failed with: %s", result.err)
|
||||||
err = result.err
|
err = result.err
|
||||||
} else {
|
} else {
|
||||||
for i := range result.series {
|
for i := range result.series {
|
||||||
|
@ -406,14 +406,15 @@ func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query str
|
||||||
Query: query,
|
Query: query,
|
||||||
}
|
}
|
||||||
|
|
||||||
if !req.Summarize {
|
start, end, step, err := queryParams(req)
|
||||||
start, end, step, err := queryParams(req)
|
if err != nil {
|
||||||
if err != nil {
|
return queryResult{res: telemPb.QueryResponse{}, err: err}
|
||||||
return queryResult{res: telemPb.QueryResponse{}, err: err}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
// EndMs always required to ensure deterministic timestamps
|
||||||
|
queryReq.EndMs = end
|
||||||
|
if !req.Summarize {
|
||||||
queryReq.StartMs = start
|
queryReq.StartMs = start
|
||||||
queryReq.EndMs = end
|
|
||||||
queryReq.Step = step
|
queryReq.Step = step
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,13 +13,21 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockTelemetry struct {
|
type mockTelemetry struct {
|
||||||
|
test *testing.T
|
||||||
client telemetry.TelemetryClient
|
client telemetry.TelemetryClient
|
||||||
res *telemetry.QueryResponse
|
tRes *telemetry.QueryResponse
|
||||||
|
mReq *pb.MetricRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
// satisfies telemetry.TelemetryClient
|
// satisfies telemetry.TelemetryClient
|
||||||
func (m *mockTelemetry) Query(ctx context.Context, in *telemetry.QueryRequest, opts ...grpc.CallOption) (*telemetry.QueryResponse, error) {
|
func (m *mockTelemetry) Query(ctx context.Context, in *telemetry.QueryRequest, opts ...grpc.CallOption) (*telemetry.QueryResponse, error) {
|
||||||
return m.res, nil
|
if in.EndMs == 0 {
|
||||||
|
m.test.Errorf("EndMs not set in telemetry request: %+v", in)
|
||||||
|
}
|
||||||
|
if !m.mReq.Summarize && (in.StartMs == 0 || in.Step == "") {
|
||||||
|
m.test.Errorf("Range params not set in timeseries request: %+v", in)
|
||||||
|
}
|
||||||
|
return m.tRes, nil
|
||||||
}
|
}
|
||||||
func (m *mockTelemetry) ListPods(ctx context.Context, in *telemetry.ListPodsRequest, opts ...grpc.CallOption) (*conduit_public.ListPodsResponse, error) {
|
func (m *mockTelemetry) ListPods(ctx context.Context, in *telemetry.ListPodsRequest, opts ...grpc.CallOption) (*conduit_public.ListPodsResponse, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -64,6 +72,8 @@ func TestStat(t *testing.T) {
|
||||||
Metrics: []pb.MetricName{
|
Metrics: []pb.MetricName{
|
||||||
pb.MetricName_REQUEST_RATE,
|
pb.MetricName_REQUEST_RATE,
|
||||||
},
|
},
|
||||||
|
Summarize: true,
|
||||||
|
Window: pb.TimeWindow_TEN_MIN,
|
||||||
},
|
},
|
||||||
mRes: &pb.MetricResponse{
|
mRes: &pb.MetricResponse{
|
||||||
Metrics: []*pb.MetricSeries{
|
Metrics: []*pb.MetricSeries{
|
||||||
|
@ -107,7 +117,7 @@ func TestStat(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tr := range responses {
|
for _, tr := range responses {
|
||||||
s := newGrpcServer(&mockTelemetry{res: tr.tRes}, tap.NewTapClient(nil))
|
s := newGrpcServer(&mockTelemetry{test: t, tRes: tr.tRes, mReq: tr.mReq}, tap.NewTapClient(nil))
|
||||||
|
|
||||||
res, err := s.Stat(context.Background(), tr.mReq)
|
res, err := s.Stat(context.Background(), tr.mReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -186,11 +186,17 @@ func (s *server) Query(ctx context.Context, req *read.QueryRequest) (*read.Query
|
||||||
|
|
||||||
samples := make([]*read.Sample, 0)
|
samples := make([]*read.Sample, 0)
|
||||||
|
|
||||||
|
if req.EndMs == 0 {
|
||||||
|
err := fmt.Errorf("EndMs timestamp missing from request: %+v", req)
|
||||||
|
log.Errorf("%s", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
end := time.Unix(0, req.EndMs*int64(time.Millisecond))
|
||||||
|
|
||||||
if req.StartMs != 0 && req.EndMs != 0 && req.Step != "" {
|
if req.StartMs != 0 && req.EndMs != 0 && req.Step != "" {
|
||||||
// timeseries query
|
// timeseries query
|
||||||
|
|
||||||
start := time.Unix(0, req.StartMs*int64(time.Millisecond))
|
start := time.Unix(0, req.StartMs*int64(time.Millisecond))
|
||||||
end := time.Unix(0, req.EndMs*int64(time.Millisecond))
|
|
||||||
step, err := time.ParseDuration(req.Step)
|
step, err := time.ParseDuration(req.Step)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ParseDuration(%+v) failed with: %+v", req.Step, err)
|
log.Errorf("ParseDuration(%+v) failed with: %+v", req.Step, err)
|
||||||
|
@ -221,9 +227,9 @@ func (s *server) Query(ctx context.Context, req *read.QueryRequest) (*read.Query
|
||||||
} else {
|
} else {
|
||||||
// single data point (aka summary) query
|
// single data point (aka summary) query
|
||||||
|
|
||||||
res, err := s.prometheusAPI.Query(ctx, req.Query, time.Time{})
|
res, err := s.prometheusAPI.Query(ctx, req.Query, end)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Query(%+v, %+v) failed with: %+v", req.Query, time.Time{}, err)
|
log.Errorf("Query(%+v, %+v) failed with: %+v", req.Query, end, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
log.Debugf("Query response: %+v", res)
|
log.Debugf("Query response: %+v", res)
|
||||||
|
|
|
@ -41,11 +41,20 @@ type testResponse struct {
|
||||||
func TestServerResponses(t *testing.T) {
|
func TestServerResponses(t *testing.T) {
|
||||||
responses := []testResponse{
|
responses := []testResponse{
|
||||||
|
|
||||||
|
testResponse{
|
||||||
|
err: errors.New("EndMs timestamp missing from request: query:\"fake query0\" "),
|
||||||
|
promRes: &model.Scalar{},
|
||||||
|
queryReq: &read.QueryRequest{
|
||||||
|
Query: "fake query0",
|
||||||
|
},
|
||||||
|
queryRes: nil,
|
||||||
|
},
|
||||||
testResponse{
|
testResponse{
|
||||||
err: errors.New("Unexpected query result type (expected Vector): scalar"),
|
err: errors.New("Unexpected query result type (expected Vector): scalar"),
|
||||||
promRes: &model.Scalar{},
|
promRes: &model.Scalar{},
|
||||||
queryReq: &read.QueryRequest{
|
queryReq: &read.QueryRequest{
|
||||||
Query: "fake query",
|
Query: "fake query1",
|
||||||
|
EndMs: 1,
|
||||||
},
|
},
|
||||||
queryRes: nil,
|
queryRes: nil,
|
||||||
},
|
},
|
||||||
|
@ -53,7 +62,8 @@ func TestServerResponses(t *testing.T) {
|
||||||
err: errors.New("Unexpected query result type (expected Vector): matrix"),
|
err: errors.New("Unexpected query result type (expected Vector): matrix"),
|
||||||
promRes: model.Matrix{},
|
promRes: model.Matrix{},
|
||||||
queryReq: &read.QueryRequest{
|
queryReq: &read.QueryRequest{
|
||||||
Query: "fake query",
|
Query: "fake query2",
|
||||||
|
EndMs: 1,
|
||||||
},
|
},
|
||||||
queryRes: nil,
|
queryRes: nil,
|
||||||
},
|
},
|
||||||
|
@ -61,7 +71,7 @@ func TestServerResponses(t *testing.T) {
|
||||||
err: errors.New("Unexpected query result type (expected Matrix): vector"),
|
err: errors.New("Unexpected query result type (expected Matrix): vector"),
|
||||||
promRes: model.Vector{},
|
promRes: model.Vector{},
|
||||||
queryReq: &read.QueryRequest{
|
queryReq: &read.QueryRequest{
|
||||||
Query: "fake query",
|
Query: "fake query3",
|
||||||
StartMs: 1,
|
StartMs: 1,
|
||||||
EndMs: 2,
|
EndMs: 2,
|
||||||
Step: "10s",
|
Step: "10s",
|
||||||
|
@ -83,7 +93,8 @@ func TestServerResponses(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
queryReq: &read.QueryRequest{
|
queryReq: &read.QueryRequest{
|
||||||
Query: "fake query",
|
Query: "fake query4",
|
||||||
|
EndMs: 1,
|
||||||
},
|
},
|
||||||
queryRes: &read.QueryResponse{
|
queryRes: &read.QueryResponse{
|
||||||
Metrics: []*read.Sample{
|
Metrics: []*read.Sample{
|
||||||
|
@ -117,7 +128,7 @@ func TestServerResponses(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
queryReq: &read.QueryRequest{
|
queryReq: &read.QueryRequest{
|
||||||
Query: "fake query",
|
Query: "fake query5",
|
||||||
StartMs: 1,
|
StartMs: 1,
|
||||||
EndMs: 2,
|
EndMs: 2,
|
||||||
Step: "10s",
|
Step: "10s",
|
||||||
|
|
|
@ -14,9 +14,17 @@ service Telemetry {
|
||||||
}
|
}
|
||||||
|
|
||||||
message QueryRequest {
|
message QueryRequest {
|
||||||
|
// required
|
||||||
string query = 1;
|
string query = 1;
|
||||||
|
|
||||||
|
// required for timeseries queries
|
||||||
int64 start_ms = 2;
|
int64 start_ms = 2;
|
||||||
|
|
||||||
|
// required for timeseries queries
|
||||||
|
// optional for single data point, but if unset, results will have non-deterministic timestamps
|
||||||
int64 end_ms = 3;
|
int64 end_ms = 3;
|
||||||
|
|
||||||
|
// required for timeseries queries
|
||||||
string step = 4;
|
string step = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue