From 50f4aa57e5e6bc5aa61cedf3fc4926ea8e39eed2 Mon Sep 17 00:00:00 2001 From: Andrew Seigner Date: Tue, 13 Feb 2018 13:52:21 -0800 Subject: [PATCH] Require timestamp on all telemetry requests (#342) PR #298 moved summary (non-timeseries) requests to Prometheus' Query endpoint, with no timestamp provided. This Query endpoint returns a single data point with whatever timestamp was provided in the request. In the absense of a timestamp, it uses current server time. This causes the Public API to return discreet data points with slightly different timestamps, which is unexpected behavior. Modify the Public API -> Telemetry -> Prometheus request path to always require a timestamp for single data point requests. Fixes #340 Signed-off-by: Andrew Seigner --- controller/api/public/grpc_server.go | 15 ++++++++------- controller/api/public/grpc_server_test.go | 16 +++++++++++++--- controller/telemetry/server.go | 12 +++++++++--- controller/telemetry/server_test.go | 21 ++++++++++++++++----- proto/controller/telemetry/telemetry.proto | 8 ++++++++ 5 files changed, 54 insertions(+), 18 deletions(-) diff --git a/controller/api/public/grpc_server.go b/controller/api/public/grpc_server.go index 6eae72818..0f58f719a 100644 --- a/controller/api/public/grpc_server.go +++ b/controller/api/public/grpc_server.go @@ -104,7 +104,7 @@ func (s *grpcServer) Stat(ctx context.Context, req *pb.MetricRequest) (*pb.Metri for _ = range req.Metrics { result := <-resultsCh if result.err != nil { - log.Errorf("Stat -> queryMetric failed with: %s", err) + log.Errorf("Stat -> queryMetric failed with: %s", result.err) err = result.err } else { for i := range result.series { @@ -406,14 +406,15 @@ func (s *grpcServer) query(ctx context.Context, req *pb.MetricRequest, query str Query: query, } - if !req.Summarize { - start, end, step, err := queryParams(req) - if err != nil { - return queryResult{res: telemPb.QueryResponse{}, err: err} - } + start, end, step, err := queryParams(req) + if err != nil { + return queryResult{res: telemPb.QueryResponse{}, err: err} + } + // EndMs always required to ensure deterministic timestamps + queryReq.EndMs = end + if !req.Summarize { queryReq.StartMs = start - queryReq.EndMs = end queryReq.Step = step } diff --git a/controller/api/public/grpc_server_test.go b/controller/api/public/grpc_server_test.go index 86c287002..50c22e366 100644 --- a/controller/api/public/grpc_server_test.go +++ b/controller/api/public/grpc_server_test.go @@ -13,13 +13,21 @@ import ( ) type mockTelemetry struct { + test *testing.T client telemetry.TelemetryClient - res *telemetry.QueryResponse + tRes *telemetry.QueryResponse + mReq *pb.MetricRequest } // satisfies telemetry.TelemetryClient func (m *mockTelemetry) Query(ctx context.Context, in *telemetry.QueryRequest, opts ...grpc.CallOption) (*telemetry.QueryResponse, error) { - return m.res, nil + if in.EndMs == 0 { + m.test.Errorf("EndMs not set in telemetry request: %+v", in) + } + if !m.mReq.Summarize && (in.StartMs == 0 || in.Step == "") { + m.test.Errorf("Range params not set in timeseries request: %+v", in) + } + return m.tRes, nil } func (m *mockTelemetry) ListPods(ctx context.Context, in *telemetry.ListPodsRequest, opts ...grpc.CallOption) (*conduit_public.ListPodsResponse, error) { return nil, nil @@ -64,6 +72,8 @@ func TestStat(t *testing.T) { Metrics: []pb.MetricName{ pb.MetricName_REQUEST_RATE, }, + Summarize: true, + Window: pb.TimeWindow_TEN_MIN, }, mRes: &pb.MetricResponse{ Metrics: []*pb.MetricSeries{ @@ -107,7 +117,7 @@ func TestStat(t *testing.T) { } for _, tr := range responses { - s := newGrpcServer(&mockTelemetry{res: tr.tRes}, tap.NewTapClient(nil)) + s := newGrpcServer(&mockTelemetry{test: t, tRes: tr.tRes, mReq: tr.mReq}, tap.NewTapClient(nil)) res, err := s.Stat(context.Background(), tr.mReq) if err != nil { diff --git a/controller/telemetry/server.go b/controller/telemetry/server.go index 6f067810d..f91e06a94 100644 --- a/controller/telemetry/server.go +++ b/controller/telemetry/server.go @@ -186,11 +186,17 @@ func (s *server) Query(ctx context.Context, req *read.QueryRequest) (*read.Query samples := make([]*read.Sample, 0) + if req.EndMs == 0 { + err := fmt.Errorf("EndMs timestamp missing from request: %+v", req) + log.Errorf("%s", err) + return nil, err + } + end := time.Unix(0, req.EndMs*int64(time.Millisecond)) + if req.StartMs != 0 && req.EndMs != 0 && req.Step != "" { // timeseries query start := time.Unix(0, req.StartMs*int64(time.Millisecond)) - end := time.Unix(0, req.EndMs*int64(time.Millisecond)) step, err := time.ParseDuration(req.Step) if err != nil { log.Errorf("ParseDuration(%+v) failed with: %+v", req.Step, err) @@ -221,9 +227,9 @@ func (s *server) Query(ctx context.Context, req *read.QueryRequest) (*read.Query } else { // single data point (aka summary) query - res, err := s.prometheusAPI.Query(ctx, req.Query, time.Time{}) + res, err := s.prometheusAPI.Query(ctx, req.Query, end) if err != nil { - log.Errorf("Query(%+v, %+v) failed with: %+v", req.Query, time.Time{}, err) + log.Errorf("Query(%+v, %+v) failed with: %+v", req.Query, end, err) return nil, err } log.Debugf("Query response: %+v", res) diff --git a/controller/telemetry/server_test.go b/controller/telemetry/server_test.go index 008558f4e..c2e7a1977 100644 --- a/controller/telemetry/server_test.go +++ b/controller/telemetry/server_test.go @@ -41,11 +41,20 @@ type testResponse struct { func TestServerResponses(t *testing.T) { responses := []testResponse{ + testResponse{ + err: errors.New("EndMs timestamp missing from request: query:\"fake query0\" "), + promRes: &model.Scalar{}, + queryReq: &read.QueryRequest{ + Query: "fake query0", + }, + queryRes: nil, + }, testResponse{ err: errors.New("Unexpected query result type (expected Vector): scalar"), promRes: &model.Scalar{}, queryReq: &read.QueryRequest{ - Query: "fake query", + Query: "fake query1", + EndMs: 1, }, queryRes: nil, }, @@ -53,7 +62,8 @@ func TestServerResponses(t *testing.T) { err: errors.New("Unexpected query result type (expected Vector): matrix"), promRes: model.Matrix{}, queryReq: &read.QueryRequest{ - Query: "fake query", + Query: "fake query2", + EndMs: 1, }, queryRes: nil, }, @@ -61,7 +71,7 @@ func TestServerResponses(t *testing.T) { err: errors.New("Unexpected query result type (expected Matrix): vector"), promRes: model.Vector{}, queryReq: &read.QueryRequest{ - Query: "fake query", + Query: "fake query3", StartMs: 1, EndMs: 2, Step: "10s", @@ -83,7 +93,8 @@ func TestServerResponses(t *testing.T) { }, }, queryReq: &read.QueryRequest{ - Query: "fake query", + Query: "fake query4", + EndMs: 1, }, queryRes: &read.QueryResponse{ Metrics: []*read.Sample{ @@ -117,7 +128,7 @@ func TestServerResponses(t *testing.T) { }, }, queryReq: &read.QueryRequest{ - Query: "fake query", + Query: "fake query5", StartMs: 1, EndMs: 2, Step: "10s", diff --git a/proto/controller/telemetry/telemetry.proto b/proto/controller/telemetry/telemetry.proto index 20b427ff6..275cb3bab 100644 --- a/proto/controller/telemetry/telemetry.proto +++ b/proto/controller/telemetry/telemetry.proto @@ -14,9 +14,17 @@ service Telemetry { } message QueryRequest { + // required string query = 1; + + // required for timeseries queries int64 start_ms = 2; + + // required for timeseries queries + // optional for single data point, but if unset, results will have non-deterministic timestamps int64 end_ms = 3; + + // required for timeseries queries string step = 4; }