mirror of https://github.com/linkerd/linkerd2.git
Store proxy latencies in a structure that matches controller histogram (#11)
The proxy currently stores latency values in an `OrderMap` and reports every observed latency value to the controller's telemetry API since the last report. The telemetry API then sends each individual value to Prometheus. This doesn't scale well when there are a large number of proxies making reports. I've modified the proxy to use a fixed-size histogram that matches the histogram buckets in Prometheus. Each report now includes an array indicating the histogram bounds, and each response scope contains a set of counts corresponding to each index in the bounds array, indicating the number of times a latency in that bucket was observed. The controller then reports the upper bound of each bucket to Prometheus, and can use the proxy's reported set of bucket bounds so that the observed values will be correct even if the bounds in the control plane are changed independently of those set in the proxy. I've also modified `simulate-proxy` to generate the new report structure, and added tests in the proxy's telemetry test suite validating the new behaviour.
This commit is contained in:
parent
fbb4e812f8
commit
915f08ac4c
|
@ -15,7 +15,6 @@ It has these top-level messages:
|
|||
TransportSummary
|
||||
RequestScope
|
||||
RequestCtx
|
||||
Latency
|
||||
ResponseScope
|
||||
ResponseCtx
|
||||
EosScope
|
||||
|
@ -73,6 +72,15 @@ type ReportRequest struct {
|
|||
ServerTransports []*ServerTransport `protobuf:"bytes,3,rep,name=server_transports,json=serverTransports" json:"server_transports,omitempty"`
|
||||
ClientTransports []*ClientTransport `protobuf:"bytes,4,rep,name=client_transports,json=clientTransports" json:"client_transports,omitempty"`
|
||||
Requests []*RequestScope `protobuf:"bytes,5,rep,name=requests" json:"requests,omitempty"`
|
||||
// The inclusive upper bound of each bucket in the response latency histogram,
|
||||
// in tenths of a millisecond.
|
||||
//
|
||||
// Each ResponseScope message will contain an array of numbers representing
|
||||
// the number of observed response latencies in each bucket of the latency
|
||||
// histogram. Since the structure of the latency histogram will be the same
|
||||
// across all ResponseScopes, we only need to report the max values for these
|
||||
// buckets a single time.
|
||||
HistogramBucketBoundsTenthMs []uint32 `protobuf:"varint,6,rep,packed,name=histogram_bucket_bounds_tenth_ms,json=histogramBucketBoundsTenthMs" json:"histogram_bucket_bounds_tenth_ms,omitempty"`
|
||||
}
|
||||
|
||||
func (m *ReportRequest) Reset() { *m = ReportRequest{} }
|
||||
|
@ -115,6 +123,13 @@ func (m *ReportRequest) GetRequests() []*RequestScope {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *ReportRequest) GetHistogramBucketBoundsTenthMs() []uint32 {
|
||||
if m != nil {
|
||||
return m.HistogramBucketBoundsTenthMs
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Process struct {
|
||||
Node string `protobuf:"bytes,1,opt,name=node" json:"node,omitempty"`
|
||||
ScheduledInstance string `protobuf:"bytes,2,opt,name=scheduled_instance,json=scheduledInstance" json:"scheduled_instance,omitempty"`
|
||||
|
@ -331,47 +346,21 @@ func (m *RequestCtx) GetPath() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// A latency value in tenths of a millisecond and a count of the times
|
||||
// that latency was observed.
|
||||
type Latency struct {
|
||||
// Latency value in tenths of a millisecond.
|
||||
Latency uint32 `protobuf:"varint,1,opt,name=latency" json:"latency,omitempty"`
|
||||
// Count of occurences of this latency value.
|
||||
Count uint32 `protobuf:"varint,2,opt,name=count" json:"count,omitempty"`
|
||||
}
|
||||
|
||||
func (m *Latency) Reset() { *m = Latency{} }
|
||||
func (m *Latency) String() string { return proto.CompactTextString(m) }
|
||||
func (*Latency) ProtoMessage() {}
|
||||
func (*Latency) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
|
||||
|
||||
func (m *Latency) GetLatency() uint32 {
|
||||
if m != nil {
|
||||
return m.Latency
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Latency) GetCount() uint32 {
|
||||
if m != nil {
|
||||
return m.Count
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type ResponseScope struct {
|
||||
Ctx *ResponseCtx `protobuf:"bytes,1,opt,name=ctx" json:"ctx,omitempty"`
|
||||
// Response latencies (time from request headers sent to response headers received),
|
||||
// represented as an array of observed latency values with precision to 100µs and
|
||||
// the number of times those values were observed, ordered by the latency value.
|
||||
ResponseLatencies []*Latency `protobuf:"bytes,2,rep,name=response_latencies,json=responseLatencies" json:"response_latencies,omitempty"`
|
||||
// Response latencies (time from request headers sent to response headers
|
||||
// received). Represented as a histogram with buckets whose inclusive
|
||||
// upper bounds are given in the `histogram_bucket_bounds_tenths_ms` array in
|
||||
// `ReportRequest`. Each number in this array represents the number of times a
|
||||
// latency falling into that bucket was observed.
|
||||
ResponseLatencyCounts []uint32 `protobuf:"varint,2,rep,packed,name=response_latency_counts,json=responseLatencyCounts" json:"response_latency_counts,omitempty"`
|
||||
Ends []*EosScope `protobuf:"bytes,3,rep,name=ends" json:"ends,omitempty"`
|
||||
}
|
||||
|
||||
func (m *ResponseScope) Reset() { *m = ResponseScope{} }
|
||||
func (m *ResponseScope) String() string { return proto.CompactTextString(m) }
|
||||
func (*ResponseScope) ProtoMessage() {}
|
||||
func (*ResponseScope) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
|
||||
func (*ResponseScope) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
|
||||
|
||||
func (m *ResponseScope) GetCtx() *ResponseCtx {
|
||||
if m != nil {
|
||||
|
@ -380,9 +369,9 @@ func (m *ResponseScope) GetCtx() *ResponseCtx {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *ResponseScope) GetResponseLatencies() []*Latency {
|
||||
func (m *ResponseScope) GetResponseLatencyCounts() []uint32 {
|
||||
if m != nil {
|
||||
return m.ResponseLatencies
|
||||
return m.ResponseLatencyCounts
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -401,7 +390,7 @@ type ResponseCtx struct {
|
|||
func (m *ResponseCtx) Reset() { *m = ResponseCtx{} }
|
||||
func (m *ResponseCtx) String() string { return proto.CompactTextString(m) }
|
||||
func (*ResponseCtx) ProtoMessage() {}
|
||||
func (*ResponseCtx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
|
||||
func (*ResponseCtx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
|
||||
|
||||
func (m *ResponseCtx) GetHttpStatusCode() uint32 {
|
||||
if m != nil {
|
||||
|
@ -418,7 +407,7 @@ type EosScope struct {
|
|||
func (m *EosScope) Reset() { *m = EosScope{} }
|
||||
func (m *EosScope) String() string { return proto.CompactTextString(m) }
|
||||
func (*EosScope) ProtoMessage() {}
|
||||
func (*EosScope) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
|
||||
func (*EosScope) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
|
||||
|
||||
func (m *EosScope) GetCtx() *EosCtx {
|
||||
if m != nil {
|
||||
|
@ -445,7 +434,7 @@ type EosCtx struct {
|
|||
func (m *EosCtx) Reset() { *m = EosCtx{} }
|
||||
func (m *EosCtx) String() string { return proto.CompactTextString(m) }
|
||||
func (*EosCtx) ProtoMessage() {}
|
||||
func (*EosCtx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
|
||||
func (*EosCtx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
|
||||
|
||||
type isEosCtx_End interface {
|
||||
isEosCtx_End()
|
||||
|
@ -584,7 +573,7 @@ type StreamSummary struct {
|
|||
func (m *StreamSummary) Reset() { *m = StreamSummary{} }
|
||||
func (m *StreamSummary) String() string { return proto.CompactTextString(m) }
|
||||
func (*StreamSummary) ProtoMessage() {}
|
||||
func (*StreamSummary) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} }
|
||||
func (*StreamSummary) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
|
||||
|
||||
func (m *StreamSummary) GetDurationMs() uint64 {
|
||||
if m != nil {
|
||||
|
@ -613,7 +602,7 @@ type ReportResponse struct {
|
|||
func (m *ReportResponse) Reset() { *m = ReportResponse{} }
|
||||
func (m *ReportResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*ReportResponse) ProtoMessage() {}
|
||||
func (*ReportResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} }
|
||||
func (*ReportResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} }
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*ReportRequest)(nil), "conduit.proxy.telemetry.ReportRequest")
|
||||
|
@ -623,7 +612,6 @@ func init() {
|
|||
proto.RegisterType((*TransportSummary)(nil), "conduit.proxy.telemetry.TransportSummary")
|
||||
proto.RegisterType((*RequestScope)(nil), "conduit.proxy.telemetry.RequestScope")
|
||||
proto.RegisterType((*RequestCtx)(nil), "conduit.proxy.telemetry.RequestCtx")
|
||||
proto.RegisterType((*Latency)(nil), "conduit.proxy.telemetry.Latency")
|
||||
proto.RegisterType((*ResponseScope)(nil), "conduit.proxy.telemetry.ResponseScope")
|
||||
proto.RegisterType((*ResponseCtx)(nil), "conduit.proxy.telemetry.ResponseCtx")
|
||||
proto.RegisterType((*EosScope)(nil), "conduit.proxy.telemetry.EosScope")
|
||||
|
@ -708,61 +696,62 @@ var _Telemetry_serviceDesc = grpc.ServiceDesc{
|
|||
func init() { proto.RegisterFile("proxy/telemetry/telemetry.proto", fileDescriptor0) }
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
// 888 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0x5f, 0x6f, 0x23, 0x35,
|
||||
0x10, 0xef, 0x36, 0x49, 0x93, 0x4c, 0x2e, 0xbd, 0xd4, 0x87, 0x60, 0xa9, 0x40, 0x2d, 0x0b, 0x1c,
|
||||
0x01, 0x41, 0x2a, 0x02, 0x77, 0x08, 0x78, 0xe1, 0xda, 0x3b, 0xa9, 0x15, 0x77, 0x6d, 0xe5, 0xb4,
|
||||
0x4f, 0x3c, 0xac, 0xf6, 0xbc, 0xc3, 0x25, 0x52, 0xd6, 0x5e, 0x6c, 0x2f, 0x6a, 0x1e, 0xd0, 0x7d,
|
||||
0x16, 0x24, 0xbe, 0x0f, 0x1f, 0x03, 0x5e, 0xf8, 0x0e, 0xc8, 0xf6, 0x7a, 0xb3, 0x0d, 0x97, 0x96,
|
||||
0x3f, 0x2f, 0x3c, 0xc5, 0x9e, 0xf9, 0xcd, 0xcf, 0x33, 0xbf, 0x19, 0x7b, 0x03, 0x7b, 0xb9, 0x14,
|
||||
0x57, 0x8b, 0x03, 0x8d, 0x73, 0xcc, 0x50, 0xcb, 0xda, 0x6a, 0x94, 0x4b, 0xa1, 0x05, 0x79, 0x83,
|
||||
0x09, 0x9e, 0x16, 0x33, 0x3d, 0xb2, 0xc0, 0x51, 0xe5, 0xde, 0xbd, 0xc7, 0x44, 0x96, 0x09, 0x7e,
|
||||
0xe0, 0x7e, 0x1c, 0x3a, 0xfa, 0xa5, 0x01, 0x7d, 0x8a, 0xb9, 0x90, 0x9a, 0xe2, 0x0f, 0x05, 0x2a,
|
||||
0x4d, 0xbe, 0x82, 0x76, 0x2e, 0x05, 0x43, 0xa5, 0xc2, 0x60, 0x3f, 0x18, 0xf6, 0xc6, 0xfb, 0xa3,
|
||||
0x35, 0x8c, 0xa3, 0x73, 0x87, 0xa3, 0x3e, 0x80, 0x1c, 0x42, 0xcb, 0x62, 0xc2, 0xcd, 0xfd, 0x60,
|
||||
0xb8, 0x3d, 0xfe, 0x78, 0x6d, 0xe4, 0xb5, 0x23, 0x0d, 0xcf, 0xd5, 0x82, 0xba, 0x50, 0x72, 0x09,
|
||||
0x3b, 0x0a, 0xe5, 0x8f, 0x28, 0x63, 0x2d, 0x13, 0xae, 0x0c, 0x4e, 0x85, 0x8d, 0xfd, 0xc6, 0xb0,
|
||||
0x37, 0x1e, 0xae, 0xe5, 0x9b, 0xd8, 0x88, 0x0b, 0x1f, 0x40, 0x07, 0xea, 0xba, 0x41, 0x19, 0x5a,
|
||||
0x36, 0x9f, 0x21, 0xd7, 0x75, 0xda, 0xe6, 0x2d, 0xb4, 0x47, 0x36, 0xa2, 0x46, 0xcb, 0xae, 0x1b,
|
||||
0x14, 0x79, 0x04, 0x1d, 0xe9, 0xaa, 0x50, 0x61, 0xcb, 0xb2, 0xbd, 0x7f, 0x43, 0xd1, 0x16, 0x38,
|
||||
0x61, 0x22, 0x47, 0x5a, 0x85, 0x45, 0x11, 0xb4, 0xac, 0x00, 0xa4, 0x07, 0xed, 0x93, 0xd3, 0xc3,
|
||||
0xb3, 0xcb, 0xd3, 0xc7, 0x83, 0x0d, 0x72, 0x07, 0x3a, 0x67, 0x97, 0x17, 0x6e, 0x17, 0x44, 0x3f,
|
||||
0x41, 0xbb, 0x14, 0x9b, 0x10, 0x68, 0x72, 0x91, 0xa2, 0x6d, 0x4e, 0x97, 0xda, 0x35, 0xf9, 0x04,
|
||||
0x88, 0x62, 0x53, 0x4c, 0x8b, 0x39, 0xa6, 0xf1, 0x8c, 0x2b, 0x9d, 0x70, 0x86, 0xb6, 0x09, 0x5d,
|
||||
0xba, 0x53, 0x79, 0x4e, 0x4a, 0x07, 0x39, 0x80, 0x7b, 0x4b, 0x38, 0x4f, 0x32, 0x54, 0x79, 0xc2,
|
||||
0x30, 0x6c, 0x58, 0xfc, 0x92, 0xe9, 0xd4, 0x7b, 0xa2, 0xdf, 0x03, 0xb8, 0xbb, 0x22, 0x31, 0x79,
|
||||
0x08, 0x5d, 0x25, 0x0a, 0xc9, 0x30, 0x9e, 0xe5, 0xe5, 0xa4, 0xbc, 0x59, 0x95, 0x5e, 0xce, 0xd8,
|
||||
0xc9, 0xf9, 0xa3, 0x34, 0x95, 0x66, 0x44, 0x3a, 0x0e, 0x7b, 0x92, 0x93, 0x5d, 0xe8, 0x30, 0xc1,
|
||||
0x39, 0x32, 0xad, 0x6c, 0x86, 0x7d, 0x5a, 0xed, 0xc9, 0xb7, 0xd0, 0x4b, 0x67, 0xaa, 0x72, 0xbb,
|
||||
0xae, 0x7f, 0xb8, 0x56, 0xd0, 0x2a, 0x99, 0x49, 0x91, 0x65, 0x89, 0x5c, 0xd0, 0x7a, 0x34, 0xf9,
|
||||
0x1c, 0x3a, 0x76, 0xc6, 0x99, 0x98, 0x87, 0x4d, 0x3b, 0x8f, 0xe1, 0x6a, 0x7e, 0xe7, 0xa5, 0x9f,
|
||||
0x56, 0xc8, 0xe8, 0x8f, 0x00, 0xee, 0xae, 0xb4, 0x9d, 0x7c, 0x0d, 0x3d, 0x9d, 0xc8, 0x17, 0xa8,
|
||||
0xe3, 0x24, 0x4d, 0x65, 0x59, 0xec, 0xee, 0x2a, 0xd9, 0x05, 0xcb, 0x7d, 0xb5, 0xe0, 0xe0, 0x66,
|
||||
0xfb, 0x7f, 0xaf, 0x97, 0xc2, 0x60, 0x95, 0x96, 0xec, 0x41, 0x2f, 0x2d, 0x64, 0xa2, 0x67, 0x82,
|
||||
0xc7, 0x99, 0x7b, 0x06, 0x9a, 0x14, 0xbc, 0xe9, 0x99, 0x22, 0x6f, 0x03, 0x3c, 0x5f, 0x68, 0x54,
|
||||
0xb1, 0x42, 0xae, 0x6d, 0x55, 0x4d, 0xda, 0xb5, 0x96, 0x09, 0x72, 0x1d, 0xfd, 0x1c, 0xc0, 0x9d,
|
||||
0xfa, 0xb0, 0x93, 0x07, 0xd0, 0x60, 0xfa, 0xaa, 0x14, 0xee, 0xdd, 0xdb, 0x2e, 0xc8, 0x91, 0xbe,
|
||||
0xa2, 0x06, 0x4f, 0x5e, 0x83, 0x16, 0x13, 0x45, 0x79, 0x42, 0x9f, 0xba, 0x0d, 0x79, 0x0c, 0x5d,
|
||||
0x89, 0x2a, 0x17, 0x5c, 0xa1, 0x97, 0xec, 0xfe, 0x0d, 0x94, 0x0e, 0xe9, 0x2e, 0xdd, 0x32, 0x30,
|
||||
0xfa, 0x2d, 0x00, 0x58, 0x9e, 0xf7, 0xaf, 0xa7, 0x79, 0x65, 0x34, 0x36, 0xff, 0xd1, 0x68, 0xbc,
|
||||
0x05, 0xdd, 0xa4, 0xd0, 0x53, 0x21, 0x67, 0x7a, 0x51, 0xde, 0xbe, 0xa5, 0x81, 0x8c, 0x61, 0x2b,
|
||||
0x43, 0x3d, 0x15, 0xa9, 0xed, 0xe6, 0x2b, 0x58, 0x8f, 0xb5, 0xce, 0x9f, 0x59, 0x04, 0x2d, 0x91,
|
||||
0xe6, 0x71, 0xc8, 0x13, 0x3d, 0x0d, 0x5b, 0xee, 0x71, 0x30, 0xeb, 0xe8, 0x4b, 0x68, 0x3f, 0x4d,
|
||||
0x34, 0x72, 0xb6, 0x20, 0x21, 0xb4, 0xe7, 0x6e, 0x69, 0x6b, 0xec, 0x53, 0xbf, 0x7d, 0xb5, 0xd4,
|
||||
0xd1, 0xaf, 0x81, 0xf9, 0x3a, 0xd4, 0x14, 0x24, 0x0f, 0xeb, 0x9d, 0x7c, 0xef, 0x56, 0xd9, 0xab,
|
||||
0x56, 0x9e, 0x01, 0xf1, 0xda, 0xc7, 0xee, 0xcc, 0x19, 0x9a, 0xfb, 0xd0, 0xb8, 0xf1, 0x03, 0x53,
|
||||
0xe6, 0x4d, 0x77, 0x7c, 0xec, 0x53, 0x1f, 0x4a, 0x1e, 0x40, 0x13, 0x79, 0xea, 0x07, 0xe0, 0x9d,
|
||||
0xb5, 0x14, 0x4f, 0x84, 0x72, 0xbd, 0xb7, 0xf0, 0xe8, 0x0b, 0xe8, 0xd5, 0x72, 0x23, 0x43, 0x18,
|
||||
0x4c, 0xb5, 0xce, 0x63, 0xa5, 0x13, 0x5d, 0xa8, 0x98, 0xf9, 0x87, 0xb5, 0x4f, 0xb7, 0x8d, 0x7d,
|
||||
0x62, 0xcd, 0x47, 0x22, 0xc5, 0xe8, 0x25, 0x74, 0x3c, 0x15, 0xf9, 0xb4, 0x2e, 0xc2, 0xde, 0x4d,
|
||||
0x47, 0x57, 0xf5, 0x7f, 0x03, 0x6d, 0xa5, 0x25, 0x26, 0x99, 0x2f, 0x7a, 0xfd, 0xc8, 0x4e, 0x2c,
|
||||
0xce, 0x5f, 0x71, 0x1f, 0x16, 0xbd, 0x84, 0x2d, 0x47, 0x48, 0x3e, 0x82, 0xc1, 0x0b, 0x99, 0xb3,
|
||||
0xbf, 0x26, 0x7d, 0xbc, 0x41, 0xb7, 0x8d, 0x67, 0x99, 0xb6, 0xc1, 0x4a, 0x54, 0xa8, 0x63, 0x94,
|
||||
0x52, 0x48, 0x87, 0xdd, 0xf4, 0x58, 0xeb, 0x79, 0x62, 0x1c, 0x16, 0xfb, 0x3a, 0xb4, 0x84, 0x9e,
|
||||
0xa2, 0xb4, 0xa3, 0xd8, 0x39, 0xde, 0xa0, 0x6e, 0x7b, 0xd8, 0x82, 0x06, 0xf2, 0x34, 0xca, 0xa1,
|
||||
0x7f, 0x2d, 0xb5, 0xff, 0xfa, 0x4c, 0x98, 0xf8, 0xef, 0xa5, 0xf9, 0xc6, 0x38, 0x7f, 0xc3, 0xea,
|
||||
0x0e, 0xce, 0x64, 0xdf, 0x91, 0x01, 0x6c, 0xfb, 0x3f, 0x0a, 0xae, 0x65, 0xe3, 0x29, 0x74, 0x2f,
|
||||
0xbc, 0x50, 0xe4, 0x3b, 0xd8, 0x72, 0x6e, 0x72, 0xff, 0xef, 0xfd, 0xd1, 0xd8, 0xfd, 0xe0, 0x56,
|
||||
0x9c, 0x3b, 0x27, 0xda, 0x78, 0xbe, 0x65, 0x5f, 0xc8, 0xcf, 0xfe, 0x0c, 0x00, 0x00, 0xff, 0xff,
|
||||
0x94, 0x14, 0x47, 0x5a, 0x70, 0x09, 0x00, 0x00,
|
||||
// 912 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xdd, 0x72, 0x1b, 0x35,
|
||||
0x14, 0x8e, 0xeb, 0x9f, 0xd8, 0xc7, 0x75, 0xea, 0xaa, 0x40, 0x97, 0x4c, 0x99, 0x98, 0x05, 0x8a,
|
||||
0x61, 0xc0, 0x19, 0x0c, 0x0d, 0x33, 0x70, 0x43, 0x9d, 0x96, 0x49, 0x06, 0x92, 0x66, 0x64, 0xe7,
|
||||
0x8a, 0x8b, 0x9d, 0x8d, 0xf6, 0x90, 0xf5, 0xe0, 0x95, 0x16, 0x49, 0xcb, 0xc4, 0x17, 0x4c, 0x9f,
|
||||
0x85, 0x97, 0xe0, 0x95, 0xe0, 0x02, 0xde, 0x81, 0x91, 0xb4, 0xbb, 0xde, 0x18, 0x9c, 0xf0, 0x73,
|
||||
0xc3, 0x95, 0xa5, 0xf3, 0x7d, 0xe7, 0x93, 0xce, 0x9f, 0xbc, 0xb0, 0x97, 0x4a, 0x71, 0xb5, 0xdc,
|
||||
0xd7, 0xb8, 0xc0, 0x04, 0xb5, 0xac, 0xac, 0x46, 0xa9, 0x14, 0x5a, 0x90, 0x87, 0x4c, 0xf0, 0x28,
|
||||
0x9b, 0xeb, 0x91, 0x25, 0x8e, 0x4a, 0x78, 0xf7, 0x01, 0x13, 0x49, 0x22, 0xf8, 0xbe, 0xfb, 0x71,
|
||||
0x6c, 0xff, 0xb7, 0x3a, 0xf4, 0x28, 0xa6, 0x42, 0x6a, 0x8a, 0xdf, 0x67, 0xa8, 0x34, 0xf9, 0x0c,
|
||||
0xb6, 0x53, 0x29, 0x18, 0x2a, 0xe5, 0xd5, 0x06, 0xb5, 0x61, 0x77, 0x3c, 0x18, 0x6d, 0x50, 0x1c,
|
||||
0x9d, 0x39, 0x1e, 0x2d, 0x1c, 0xc8, 0x04, 0x9a, 0x96, 0xe3, 0xdd, 0x19, 0xd4, 0x86, 0x3b, 0xe3,
|
||||
0x0f, 0x36, 0x7a, 0x5e, 0x3b, 0xd2, 0xe8, 0x5c, 0x2d, 0xa9, 0x73, 0x25, 0xe7, 0x70, 0x5f, 0xa1,
|
||||
0xfc, 0x01, 0x65, 0xa0, 0x65, 0xc8, 0x95, 0xe1, 0x29, 0xaf, 0x3e, 0xa8, 0x0f, 0xbb, 0xe3, 0xe1,
|
||||
0x46, 0xbd, 0xa9, 0xf5, 0x98, 0x15, 0x0e, 0xb4, 0xaf, 0xae, 0x1b, 0x94, 0x91, 0x65, 0x8b, 0x39,
|
||||
0x72, 0x5d, 0x95, 0x6d, 0xdc, 0x22, 0x7b, 0x68, 0x3d, 0x2a, 0xb2, 0xec, 0xba, 0x41, 0x91, 0xa7,
|
||||
0xd0, 0x96, 0x2e, 0x0a, 0xe5, 0x35, 0xad, 0xda, 0x3b, 0x37, 0x04, 0x6d, 0x89, 0x53, 0x26, 0x52,
|
||||
0xa4, 0xa5, 0x1b, 0xf9, 0x12, 0x06, 0xf1, 0x5c, 0x69, 0x71, 0x29, 0xc3, 0x24, 0xb8, 0xc8, 0xd8,
|
||||
0x77, 0xa8, 0x83, 0x0b, 0x91, 0xf1, 0x48, 0x05, 0x1a, 0xb9, 0x8e, 0x83, 0x44, 0x79, 0xad, 0x41,
|
||||
0x7d, 0xd8, 0xa3, 0x8f, 0x4a, 0xde, 0xc4, 0xd2, 0x26, 0x96, 0x35, 0x33, 0xa4, 0x13, 0xe5, 0xfb,
|
||||
0xd0, 0xb4, 0x89, 0x24, 0x5d, 0xd8, 0x3e, 0x3e, 0x9d, 0xbc, 0x38, 0x3f, 0x7d, 0xd6, 0xdf, 0x22,
|
||||
0x77, 0xa1, 0xfd, 0xe2, 0x7c, 0xe6, 0x76, 0x35, 0xff, 0x47, 0xd8, 0xce, 0x8b, 0x46, 0x08, 0x34,
|
||||
0xb8, 0x88, 0xd0, 0x16, 0xb9, 0x43, 0xed, 0x9a, 0x7c, 0x08, 0x44, 0xb1, 0x18, 0xa3, 0x6c, 0x81,
|
||||
0x51, 0x30, 0xe7, 0x4a, 0x87, 0x9c, 0xa1, 0x2d, 0x66, 0x87, 0xde, 0x2f, 0x91, 0xe3, 0x1c, 0x20,
|
||||
0xfb, 0xf0, 0x60, 0x45, 0xe7, 0x61, 0x82, 0x2a, 0x0d, 0x19, 0x7a, 0x75, 0xcb, 0x5f, 0x29, 0x9d,
|
||||
0x16, 0x88, 0xff, 0x6b, 0x0d, 0xee, 0xad, 0x95, 0x8a, 0x1c, 0x40, 0x47, 0x89, 0x4c, 0x32, 0x0c,
|
||||
0xe6, 0x69, 0xde, 0x71, 0xaf, 0x97, 0x29, 0xcc, 0x7b, 0xf5, 0xf8, 0xec, 0x69, 0x14, 0x49, 0xd3,
|
||||
0x6a, 0x6d, 0xc7, 0x3d, 0x4e, 0xc9, 0x2e, 0xb4, 0x99, 0xe0, 0x1c, 0x99, 0x56, 0xf6, 0x86, 0x3d,
|
||||
0x5a, 0xee, 0xc9, 0x57, 0xd0, 0x8d, 0xe6, 0xaa, 0x84, 0x5d, 0xf7, 0xbc, 0xb7, 0xb1, 0x30, 0xe5,
|
||||
0x65, 0xa6, 0x59, 0x92, 0x84, 0x72, 0x49, 0xab, 0xde, 0xe4, 0x13, 0x68, 0xdb, 0x59, 0x61, 0x62,
|
||||
0xe1, 0x35, 0x6c, 0x5f, 0x7b, 0xeb, 0xf7, 0x3b, 0xcb, 0x71, 0x5a, 0x32, 0xfd, 0xdf, 0x6b, 0x70,
|
||||
0x6f, 0xad, 0x7d, 0xc8, 0xe7, 0xd0, 0xd5, 0xa1, 0xbc, 0x44, 0x1d, 0x84, 0x51, 0x24, 0xf3, 0x60,
|
||||
0x77, 0xd7, 0xc5, 0x66, 0x2c, 0x2d, 0xa2, 0x05, 0x47, 0x37, 0xdb, 0xff, 0x7b, 0xbc, 0x14, 0xfa,
|
||||
0xeb, 0xb2, 0x64, 0x0f, 0xba, 0x51, 0x26, 0x43, 0x3d, 0x17, 0xdc, 0x34, 0xb1, 0x89, 0xb7, 0x41,
|
||||
0xa1, 0x30, 0x9d, 0x28, 0xf2, 0x06, 0xc0, 0xc5, 0x52, 0xa3, 0x0a, 0x14, 0x72, 0x6d, 0xa3, 0x6a,
|
||||
0xd0, 0x8e, 0xb5, 0x4c, 0x91, 0x6b, 0xff, 0xa7, 0x1a, 0xdc, 0xad, 0x0e, 0x0d, 0x79, 0x02, 0x75,
|
||||
0xa6, 0xaf, 0xf2, 0xc4, 0xbd, 0x75, 0xdb, 0xa0, 0x1d, 0xea, 0x2b, 0x6a, 0xf8, 0xe4, 0x15, 0x68,
|
||||
0x32, 0x91, 0xe5, 0x27, 0xf4, 0xa8, 0xdb, 0x90, 0x67, 0xd0, 0x91, 0xa8, 0x52, 0xc1, 0x15, 0x16,
|
||||
0x29, 0x7b, 0x7c, 0x83, 0xa4, 0x63, 0xba, 0xe1, 0x5d, 0x39, 0xfa, 0xbf, 0xd4, 0x00, 0x56, 0xe7,
|
||||
0xfd, 0xeb, 0x6e, 0x5e, 0x6b, 0x8d, 0x3b, 0xff, 0xa8, 0x35, 0x1e, 0x41, 0x27, 0xcc, 0x74, 0x2c,
|
||||
0xe4, 0x5c, 0x2f, 0xf3, 0xe9, 0x5b, 0x19, 0xc8, 0x18, 0x5a, 0x09, 0xea, 0x58, 0x44, 0xb6, 0x9a,
|
||||
0x7f, 0xa1, 0x7a, 0xa4, 0x75, 0x7a, 0x62, 0x19, 0x34, 0x67, 0x9a, 0xc7, 0x21, 0x0d, 0x75, 0xec,
|
||||
0x35, 0xdd, 0xe3, 0x60, 0xd6, 0xfe, 0xcf, 0x35, 0xf3, 0x57, 0x51, 0x49, 0x03, 0x39, 0xa8, 0x96,
|
||||
0xe3, 0xed, 0x5b, 0x73, 0x57, 0xd6, 0xe3, 0x00, 0x1e, 0x16, 0x09, 0x0c, 0x16, 0xa1, 0x46, 0xce,
|
||||
0x96, 0x81, 0xad, 0x89, 0xe9, 0x6c, 0xf3, 0xd0, 0xbd, 0x5a, 0xc0, 0x5f, 0x3b, 0xf4, 0xd0, 0x82,
|
||||
0xe4, 0x09, 0x34, 0x90, 0x47, 0x45, 0xb1, 0xde, 0xdc, 0x78, 0xe0, 0x73, 0xa1, 0x5c, 0x9d, 0x2c,
|
||||
0xdd, 0xff, 0x14, 0xba, 0x95, 0x2b, 0x90, 0x21, 0xf4, 0x63, 0xad, 0xd3, 0x40, 0xe9, 0x50, 0x67,
|
||||
0x2a, 0x60, 0xc5, 0x23, 0xd8, 0xa3, 0x3b, 0xc6, 0x3e, 0xb5, 0xe6, 0x43, 0x11, 0xa1, 0xff, 0x12,
|
||||
0xda, 0x85, 0x14, 0xf9, 0xa8, 0x1a, 0xeb, 0xde, 0x4d, 0x47, 0x97, 0x61, 0x7e, 0x01, 0xdb, 0x4a,
|
||||
0x4b, 0x0c, 0x13, 0x17, 0xd6, 0x4d, 0xed, 0x35, 0xb5, 0xbc, 0x62, 0x1c, 0x0b, 0x37, 0xff, 0x25,
|
||||
0xb4, 0x9c, 0x20, 0x79, 0x1f, 0xfa, 0x97, 0x32, 0x65, 0x7f, 0xbe, 0xf4, 0xd1, 0x16, 0xdd, 0x31,
|
||||
0xc8, 0xea, 0xda, 0x86, 0x2b, 0x51, 0xa1, 0x0e, 0x50, 0x4a, 0x21, 0x1d, 0xf7, 0x4e, 0xc1, 0xb5,
|
||||
0xc8, 0x73, 0x03, 0x58, 0xee, 0x6b, 0xd0, 0x14, 0x3a, 0x46, 0x69, 0xdb, 0xa6, 0x7d, 0xb4, 0x45,
|
||||
0xdd, 0x76, 0xd2, 0x84, 0x3a, 0xf2, 0xc8, 0x4f, 0xa1, 0x77, 0xed, 0x6a, 0xff, 0x75, 0xa4, 0x8d,
|
||||
0xff, 0xb7, 0xd2, 0xfc, 0x1f, 0x38, 0xbc, 0x6e, 0xf3, 0x0e, 0xce, 0x64, 0x67, 0xbe, 0x0f, 0x3b,
|
||||
0xc5, 0xc7, 0x81, 0x2b, 0xd9, 0x38, 0x86, 0xce, 0xac, 0x48, 0x14, 0xf9, 0x06, 0x5a, 0x0e, 0x26,
|
||||
0x8f, 0xff, 0xde, 0xc7, 0xc5, 0xee, 0xbb, 0xb7, 0xf2, 0xdc, 0x39, 0xfe, 0xd6, 0x45, 0xcb, 0xbe,
|
||||
0x66, 0x1f, 0xff, 0x11, 0x00, 0x00, 0xff, 0xff, 0x1e, 0x60, 0x0d, 0x22, 0x64, 0x09, 0x00, 0x00,
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
@ -98,6 +99,29 @@ var (
|
|||
FramesSent: 4,
|
||||
}
|
||||
ports = []uint32{3333, 6262}
|
||||
|
||||
// latencyBucketBounds holds the maximum value (inclusive, in tenths of a
|
||||
// millisecond) that may be counted in a given histogram bucket.
|
||||
|
||||
// These values are one order of magnitude greater than the controller's
|
||||
// Prometheus buckets, because the proxy will reports latencies in tenths
|
||||
// of a millisecond rather than whole milliseconds.
|
||||
latencyBucketBounds = [26]uint32{
|
||||
// prometheus.LinearBuckets(1, 1, 5),
|
||||
10, 20, 30, 40, 50,
|
||||
// prometheus.LinearBuckets(10, 10, 5),
|
||||
100, 200, 300, 400, 50,
|
||||
// prometheus.LinearBuckets(100, 100, 5),
|
||||
1000, 2000, 3000, 4000, 5000,
|
||||
// prometheus.LinearBuckets(1000, 1000, 5),
|
||||
10000, 20000, 30000, 40000, 5000,
|
||||
// prometheus.LinearBuckets(10000, 10000, 5),
|
||||
100000, 200000, 300000, 400000, 500000,
|
||||
// Prometheus implicitly creates a max bucket for everything that
|
||||
// falls outside of the highest-valued bucket, but we need to
|
||||
// create it explicitly.
|
||||
math.MaxUint32,
|
||||
}
|
||||
)
|
||||
|
||||
func randomPort() uint32 {
|
||||
|
@ -108,18 +132,15 @@ func randomCount() uint32 {
|
|||
return uint32(rand.Int31n(100) + 1)
|
||||
}
|
||||
|
||||
func randomLatencies(count uint32) (latencies []*pb.Latency) {
|
||||
func randomLatencies(count uint32) []uint32 {
|
||||
latencies := make([]uint32, len(latencyBucketBounds))
|
||||
for i := uint32(0); i < count; i++ {
|
||||
|
||||
// The latency value with precision to 100µs.
|
||||
latencyValue := uint32(rand.Int31n(int32(time.Second / (time.Millisecond * 10))))
|
||||
latency := pb.Latency{
|
||||
Latency: latencyValue,
|
||||
Count: 1,
|
||||
// Randomly select a bucket to increment.
|
||||
bucket := uint32(rand.Int31n(int32(len(latencies))))
|
||||
latencies[bucket]++
|
||||
}
|
||||
latencies = append(latencies, &latency)
|
||||
}
|
||||
return
|
||||
return latencies
|
||||
}
|
||||
|
||||
func randomGrpcEos(count uint32) (eos []*pb.EosScope) {
|
||||
|
@ -303,7 +324,7 @@ func main() {
|
|||
Ctx: &pb.ResponseCtx{
|
||||
HttpStatusCode: http.StatusOK,
|
||||
},
|
||||
ResponseLatencies: randomLatencies(count),
|
||||
ResponseLatencyCounts: randomLatencies(count),
|
||||
Ends: randomGrpcEos(count),
|
||||
},
|
||||
},
|
||||
|
@ -327,12 +348,14 @@ func main() {
|
|||
Ctx: &pb.ResponseCtx{
|
||||
HttpStatusCode: randomHttpResponseCode(),
|
||||
},
|
||||
ResponseLatencies: randomLatencies(count),
|
||||
ResponseLatencyCounts: randomLatencies(count),
|
||||
Ends: randomH2Eos(count),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
HistogramBucketBoundsTenthMs: latencyBucketBounds[:],
|
||||
}
|
||||
|
||||
_, err = client.Report(context.Background(), req)
|
||||
|
|
|
@ -272,8 +272,8 @@ func (s *server) Report(ctx context.Context, req *write.ReportRequest) (*write.R
|
|||
id = req.Process.ScheduledNamespace + "/" + req.Process.ScheduledInstance
|
||||
}
|
||||
|
||||
log := log.WithFields(log.Fields{"id": id})
|
||||
log.Debugf("Received report with %d requests", len(req.Requests))
|
||||
logCtx := log.WithFields(log.Fields{"id": id})
|
||||
logCtx.Debugf("Received report with %d requests", len(req.Requests))
|
||||
|
||||
s.instances.update(id)
|
||||
|
||||
|
@ -290,17 +290,28 @@ func (s *server) Report(ctx context.Context, req *write.ReportRequest) (*write.R
|
|||
return nil, errors.New("ResponseCtx is required")
|
||||
}
|
||||
|
||||
for _, latency := range responseScope.ResponseLatencies {
|
||||
// The latencies as received from the proxy are represented as an array of
|
||||
// latency values in tenths of a millisecond, and a count of the number of
|
||||
// times a request of that latency was observed.
|
||||
// Validate this ResponseScope's latency histogram.
|
||||
numBuckets := len(responseScope.ResponseLatencyCounts)
|
||||
expectedNumBuckets := len(req.HistogramBucketBoundsTenthMs)
|
||||
if numBuckets != expectedNumBuckets {
|
||||
err := errors.New(
|
||||
"received report with incorrect number of latency buckets")
|
||||
logCtx.WithFields(log.Fields{
|
||||
"numBuckets": numBuckets,
|
||||
"expected": expectedNumBuckets,
|
||||
"scope": responseScope,
|
||||
}).WithError(err).Error()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// First, convert the latency value from tenths of a ms to ms and
|
||||
// convert from u32 to f64.
|
||||
latencyMs := float64(latency.Latency * 10)
|
||||
for i := uint32(0); i < latency.Count; i++ {
|
||||
// Then, report that latency value to Prometheus a number of times
|
||||
// equal to the count reported by the proxy.
|
||||
for bucketNum, count := range responseScope.ResponseLatencyCounts {
|
||||
// Look up the bucket max value corresponding to this position
|
||||
// in the report's latency histogram.
|
||||
latencyTenthsMs := req.HistogramBucketBoundsTenthMs[bucketNum]
|
||||
latencyMs := float64(latencyTenthsMs) / 10
|
||||
for i := uint32(0); i < count; i++ {
|
||||
// Then, report that latency value to Prometheus a number
|
||||
// of times equal to the count reported by the proxy.
|
||||
latencyStat.Observe(latencyMs)
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,16 @@ message ReportRequest {
|
|||
repeated ClientTransport client_transports = 4;
|
||||
|
||||
repeated RequestScope requests = 5;
|
||||
|
||||
// The inclusive upper bound of each bucket in the response latency histogram,
|
||||
// in tenths of a millisecond.
|
||||
//
|
||||
// Each ResponseScope message will contain an array of numbers representing
|
||||
// the number of observed response latencies in each bucket of the latency
|
||||
// histogram. Since the structure of the latency histogram will be the same
|
||||
// across all ResponseScopes, we only need to report the max values for these
|
||||
// buckets a single time.
|
||||
repeated uint32 histogram_bucket_bounds_tenth_ms = 6;
|
||||
}
|
||||
|
||||
message Process {
|
||||
|
@ -67,21 +77,14 @@ message RequestCtx {
|
|||
string path = 5;
|
||||
}
|
||||
|
||||
// A latency value in tenths of a millisecond and a count of the times
|
||||
// that latency was observed.
|
||||
message Latency {
|
||||
// Latency value in tenths of a millisecond.
|
||||
uint32 latency = 1;
|
||||
// Count of occurences of this latency value.
|
||||
uint32 count = 2;
|
||||
}
|
||||
|
||||
message ResponseScope {
|
||||
ResponseCtx ctx = 1;
|
||||
// Response latencies (time from request headers sent to response headers received),
|
||||
// represented as an array of observed latency values with precision to 100µs and
|
||||
// the number of times those values were observed, ordered by the latency value.
|
||||
repeated Latency response_latencies = 2;
|
||||
// Response latencies (time from request headers sent to response headers
|
||||
// received). Represented as a histogram with buckets whose inclusive
|
||||
// upper bounds are given in the `histogram_bucket_bounds_tenths_ms` array in
|
||||
// `ReportRequest`. Each number in this array represents the number of times a
|
||||
// latency falling into that bucket was observed.
|
||||
repeated uint32 response_latency_counts = 2;
|
||||
repeated EosScope ends = 3;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
#![deny(missing_docs)]
|
||||
use std::{ops, slice, u32};
|
||||
use std::default::Default;
|
||||
use std::time::Duration;
|
||||
|
||||
/// The number of buckets in a latency histogram.
|
||||
pub const NUM_BUCKETS: usize = 26;
|
||||
|
||||
/// The maximum value (inclusive) for each latency bucket in
|
||||
/// tenths of a millisecond.
|
||||
pub const BUCKET_BOUNDS: [Latency; NUM_BUCKETS] = [
|
||||
// The controller telemetry server creates 5 sets of 5 linear buckets
|
||||
// each:
|
||||
// TODO: it would be nice if we didn't have to hard-code each
|
||||
// individual bucket and could use Rust ranges or something.
|
||||
// However, because we're using a raw fixed size array rather
|
||||
// than a vector (as we don't ever expect to grow this array
|
||||
// and thus don't _need_ a vector) we can't concatenate it
|
||||
// from smaller arrays, making it difficult to construct
|
||||
// programmatically...
|
||||
// in the controller:
|
||||
// prometheus.LinearBuckets(1, 1, 5),
|
||||
Latency(10),
|
||||
Latency(20),
|
||||
Latency(30),
|
||||
Latency(40),
|
||||
Latency(50),
|
||||
// prometheus.LinearBuckets(10, 10, 5),
|
||||
Latency(100),
|
||||
Latency(200),
|
||||
Latency(300),
|
||||
Latency(400),
|
||||
Latency(500),
|
||||
// prometheus.LinearBuckets(100, 100, 5),
|
||||
Latency(1_000),
|
||||
Latency(2_000),
|
||||
Latency(3_000),
|
||||
Latency(4_000),
|
||||
Latency(5_000),
|
||||
// prometheus.LinearBuckets(1000, 1000, 5),
|
||||
Latency(10_000),
|
||||
Latency(20_000),
|
||||
Latency(30_000),
|
||||
Latency(40_000),
|
||||
Latency(50_000),
|
||||
// prometheus.LinearBuckets(10000, 10000, 5),
|
||||
Latency(100_000),
|
||||
Latency(200_000),
|
||||
Latency(300_000),
|
||||
Latency(400_000),
|
||||
Latency(500_000),
|
||||
// Prometheus implicitly creates a max bucket for everything that
|
||||
// falls outside of the highest-valued bucket, but we need to
|
||||
// create it explicitly.
|
||||
Latency(u32::MAX),
|
||||
];
|
||||
|
||||
/// A series of latency values and counts.
|
||||
#[derive(Debug)]
|
||||
pub struct Histogram([u32; NUM_BUCKETS]);
|
||||
|
||||
/// A latency in tenths of a millisecond.
|
||||
#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash)]
|
||||
pub struct Latency(u32);
|
||||
|
||||
|
||||
// ===== impl Histogram =====
|
||||
|
||||
impl Histogram {
|
||||
|
||||
/// Observe a measurement
|
||||
pub fn observe<I>(&mut self, measurement: I)
|
||||
where
|
||||
I: Into<Latency>,
|
||||
{
|
||||
let measurement = measurement.into();
|
||||
let i = BUCKET_BOUNDS.iter()
|
||||
.position(|max| &measurement <= max)
|
||||
.expect("latency value greater than u32::MAX; this shouldn't be \
|
||||
possible.");
|
||||
self.0[i] += 1;
|
||||
}
|
||||
|
||||
/// Construct a new, empty `Histogram`.
|
||||
pub fn new() -> Self {
|
||||
Histogram([0; NUM_BUCKETS])
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<I> ops::AddAssign<I> for Histogram
|
||||
where
|
||||
I: Into<Latency>
|
||||
{
|
||||
#[inline]
|
||||
fn add_assign(&mut self, measurement: I) {
|
||||
self.observe(measurement)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
impl<'a> IntoIterator for &'a Histogram {
|
||||
type Item = &'a u32;
|
||||
type IntoIter = slice::Iter<'a, u32>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.0.iter()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
impl Default for Histogram {
|
||||
#[inline]
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Latency =====
|
||||
|
||||
|
||||
const SEC_TO_MS: u32 = 1_000;
|
||||
const SEC_TO_TENTHS_OF_A_MS: u32 = SEC_TO_MS * 10;
|
||||
const TENTHS_OF_MS_TO_NS: u32 = MS_TO_NS / 10;
|
||||
/// Conversion ratio from milliseconds to nanoseconds.
|
||||
pub const MS_TO_NS: u32 = 1_000_000;
|
||||
|
||||
impl From<Duration> for Latency {
|
||||
fn from(dur: Duration) -> Self {
|
||||
let secs = dur.as_secs();
|
||||
// checked conversion from u64 -> u32.
|
||||
let secs =
|
||||
if secs >= u64::from(u32::MAX) {
|
||||
None
|
||||
} else {
|
||||
Some(secs as u32)
|
||||
};
|
||||
// represent the duration as tenths of a ms.
|
||||
let tenths_of_ms = {
|
||||
let t = secs.and_then(|as_secs|
|
||||
// convert the number of seconds to tenths of a ms, or
|
||||
// None on overflow.
|
||||
as_secs.checked_mul(SEC_TO_TENTHS_OF_A_MS)
|
||||
);
|
||||
let t = t.and_then(|as_tenths_ms| {
|
||||
// convert the subsecond part of the duration (in ns) to
|
||||
// tenths of a millisecond.
|
||||
let subsec_tenths_ms = dur.subsec_nanos() / TENTHS_OF_MS_TO_NS;
|
||||
as_tenths_ms.checked_add(subsec_tenths_ms)
|
||||
});
|
||||
t.unwrap_or_else(|| {
|
||||
debug!(
|
||||
"{:?} too large to represent as tenths of a \
|
||||
millisecond!",
|
||||
dur
|
||||
);
|
||||
u32::MAX
|
||||
})
|
||||
};
|
||||
Latency(tenths_of_ms)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u32> for Latency {
|
||||
#[inline]
|
||||
fn from(value: u32) -> Self {
|
||||
Latency(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<u32> for Latency {
|
||||
fn into(self) -> u32 {
|
||||
self.0
|
||||
}
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
use std::{u32, u64};
|
||||
use std::net;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{u32, u64};
|
||||
|
||||
use http;
|
||||
use ordermap::OrderMap;
|
||||
|
@ -16,7 +16,6 @@ use conduit_proxy_controller_grpc::telemetry::{
|
|||
eos_ctx,
|
||||
EosCtx,
|
||||
EosScope,
|
||||
Latency as PbLatency,
|
||||
ReportRequest,
|
||||
RequestCtx,
|
||||
RequestScope,
|
||||
|
@ -29,6 +28,8 @@ use conduit_proxy_controller_grpc::telemetry::{
|
|||
use ctx;
|
||||
use telemetry::event::Event;
|
||||
|
||||
mod latency;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Metrics {
|
||||
sources: OrderMap<net::IpAddr, TransportStats>,
|
||||
|
@ -51,14 +52,6 @@ struct RequestStats {
|
|||
responses: OrderMap<Option<http::StatusCode>, ResponseStats>,
|
||||
}
|
||||
|
||||
/// A latency in tenths of a millisecond.
|
||||
#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash)]
|
||||
struct Latency(pub u32);
|
||||
|
||||
/// A series of latency values and counts.
|
||||
#[derive(Debug, Default)]
|
||||
struct Latencies(pub OrderMap<Latency, u32>);
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct ResponseStats {
|
||||
ends: OrderMap<End, Vec<EndStats>>,
|
||||
|
@ -66,7 +59,7 @@ struct ResponseStats {
|
|||
///
|
||||
/// Observed latencies are mapped to a count of the times that
|
||||
/// latency value was seen.
|
||||
latencies: Latencies,
|
||||
latencies: latency::Histogram,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -139,7 +132,7 @@ impl Metrics {
|
|||
.entry(End::Reset(fail.error.into()))
|
||||
.or_insert_with(Default::default);
|
||||
|
||||
stats.latencies.add(fail.since_request_open);
|
||||
stats.latencies += fail.since_request_open;
|
||||
ends.push(EndStats {
|
||||
// We never got a response, but we need to a count
|
||||
// for this request + end, so a 0 EndStats is used.
|
||||
|
@ -153,8 +146,8 @@ impl Metrics {
|
|||
}
|
||||
|
||||
Event::StreamResponseOpen(ref res, ref open) => {
|
||||
self.response(res).latencies.add(open.since_request_open);
|
||||
}
|
||||
self.response(res).latencies += open.since_request_open;
|
||||
},
|
||||
Event::StreamResponseFail(ref res, ref fail) => {
|
||||
self.response_end(res, End::Reset(fail.error.into()))
|
||||
.push(EndStats {
|
||||
|
@ -219,6 +212,11 @@ impl Metrics {
|
|||
}
|
||||
|
||||
pub fn generate_report(&mut self) -> ReportRequest {
|
||||
let histogram_bucket_bounds_tenth_ms: Vec<u32> =
|
||||
latency::BUCKET_BOUNDS.iter()
|
||||
.map(|&latency| latency.into())
|
||||
.collect();
|
||||
|
||||
let mut server_transports = Vec::new();
|
||||
let mut client_transports = Vec::new();
|
||||
|
||||
|
@ -282,7 +280,10 @@ impl Metrics {
|
|||
}
|
||||
}),
|
||||
ends: ends,
|
||||
response_latencies: res_stats.latencies.into(),
|
||||
response_latency_counts: res_stats.latencies
|
||||
.into_iter()
|
||||
.map(|l| *l)
|
||||
.collect(),
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -312,70 +313,18 @@ impl Metrics {
|
|||
server_transports,
|
||||
client_transports,
|
||||
requests,
|
||||
histogram_bucket_bounds_tenth_ms,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Latency =====
|
||||
|
||||
const MS_TO_NS: u32 = 1_000_000;
|
||||
|
||||
impl From<Duration> for Latency {
|
||||
fn from(dur: Duration) -> Self {
|
||||
// TODO: represent ms conversion at type level...
|
||||
let as_ms = dur_to_ms(dur);
|
||||
|
||||
// checked conversion to u32.
|
||||
let as_ms = if as_ms > u64::from(u32::MAX) {
|
||||
None
|
||||
} else {
|
||||
Some(as_ms as u32)
|
||||
};
|
||||
|
||||
// divide the duration as ms by ten to get the value in tenths of a ms.
|
||||
let as_tenths = as_ms.and_then(|ms| ms.checked_div(10)).unwrap_or_else(|| {
|
||||
debug!("{:?} too large to convert to tenths of a millisecond!", dur);
|
||||
u32::MAX
|
||||
});
|
||||
|
||||
Latency(as_tenths)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl Latencies =====
|
||||
|
||||
impl Latencies {
|
||||
#[inline]
|
||||
fn add<L: Into<Latency>>(&mut self, latency: L) {
|
||||
let value = self.0.entry(latency.into()).or_insert(0);
|
||||
*value += 1;
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<Vec<PbLatency>> for Latencies {
|
||||
fn into(mut self) -> Vec<PbLatency> {
|
||||
// NOTE: `OrderMap.drain` means we can reuse the allocated memory --- can we
|
||||
// ensure we're not allocating a new OrderMap after covnerting to pb?
|
||||
self.0
|
||||
.drain(..)
|
||||
.map(|(Latency(latency), count)| {
|
||||
PbLatency {
|
||||
latency,
|
||||
count,
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn dur_to_ms(dur: Duration) -> u64 {
|
||||
dur.as_secs()
|
||||
// note that this could just be saturating addition if we didn't want
|
||||
// to log if an overflow occurs...
|
||||
.checked_mul(1_000)
|
||||
.and_then(|as_millis| {
|
||||
let subsec = u64::from(dur.subsec_nanos() / MS_TO_NS);
|
||||
let subsec = u64::from(dur.subsec_nanos() / latency::MS_TO_NS);
|
||||
as_millis.checked_add(subsec)
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
|
@ -383,40 +332,3 @@ fn dur_to_ms(dur: Duration) -> u64 {
|
|||
u64::MAX
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn latencies_incr() {
|
||||
let mut latencies = Latencies::default();
|
||||
assert!(latencies.0.is_empty());
|
||||
|
||||
latencies.add(Duration::from_secs(10));
|
||||
assert_eq!(
|
||||
latencies.0.get(&Latency::from(Duration::from_secs(10))),
|
||||
Some(&1)
|
||||
);
|
||||
|
||||
latencies.add(Duration::from_secs(15));
|
||||
assert_eq!(
|
||||
latencies.0.get(&Latency::from(Duration::from_secs(10))),
|
||||
Some(&1)
|
||||
);
|
||||
assert_eq!(
|
||||
latencies.0.get(&Latency::from(Duration::from_secs(15))),
|
||||
Some(&1)
|
||||
);
|
||||
|
||||
latencies.add(Duration::from_secs(10));
|
||||
assert_eq!(
|
||||
latencies.0.get(&Latency::from(Duration::from_secs(10))),
|
||||
Some(&2)
|
||||
);
|
||||
assert_eq!(
|
||||
latencies.0.get(&Latency::from(Duration::from_secs(15))),
|
||||
Some(&1)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ extern crate url;
|
|||
pub extern crate env_logger;
|
||||
|
||||
use self::bytes::{BigEndian, Bytes, BytesMut};
|
||||
pub use self::conduit_proxy::*;
|
||||
pub use self::futures::*;
|
||||
use self::futures::sync::oneshot;
|
||||
pub use self::http::{HeaderMap, Request, Response};
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use support::*;
|
||||
|
||||
|
@ -59,6 +60,24 @@ impl Server {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn route_with_latency(
|
||||
mut self,
|
||||
path: &str,
|
||||
resp: &str,
|
||||
latency: Duration
|
||||
) -> Self {
|
||||
let resp = resp.to_owned();
|
||||
let route = Route(Box::new(move |_| {
|
||||
thread::sleep(latency);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(resp.clone())
|
||||
.unwrap()
|
||||
}));
|
||||
self.routes.insert(path.into(), route);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn run(self) -> Listening {
|
||||
let (tx, rx) = shutdown_signal();
|
||||
let (addr_tx, addr_rx) = oneshot::channel();
|
||||
|
|
|
@ -42,7 +42,12 @@ fn inbound_sends_telemetry() {
|
|||
// responses
|
||||
let res = &req.responses[0];
|
||||
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
|
||||
assert_eq!(res.response_latencies.len(), 1);
|
||||
// response latencies should always have a length equal to the number
|
||||
// of latency buckets in the latency histogram.
|
||||
assert_eq!(
|
||||
res.response_latency_counts.len(),
|
||||
report.histogram_bucket_bounds_tenth_ms.len()
|
||||
);
|
||||
assert_eq!(res.ends.len(), 1);
|
||||
// ends
|
||||
let ends = &res.ends[0];
|
||||
|
@ -53,6 +58,7 @@ fn inbound_sends_telemetry() {
|
|||
assert_eq!(stream.frames_sent, 1);
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn http1_inbound_sends_telemetry() {
|
||||
let _ = env_logger::init();
|
||||
|
@ -87,7 +93,12 @@ fn http1_inbound_sends_telemetry() {
|
|||
// responses
|
||||
let res = &req.responses[0];
|
||||
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
|
||||
assert_eq!(res.response_latencies.len(), 1);
|
||||
// response latencies should always have a length equal to the number
|
||||
// of latency buckets in the latency histogram.
|
||||
assert_eq!(
|
||||
res.response_latency_counts.len(),
|
||||
report.histogram_bucket_bounds_tenth_ms.len()
|
||||
);
|
||||
assert_eq!(res.ends.len(), 1);
|
||||
// ends
|
||||
let ends = &res.ends[0];
|
||||
|
@ -98,6 +109,181 @@ fn http1_inbound_sends_telemetry() {
|
|||
assert_eq!(stream.frames_sent, 1);
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn inbound_aggregates_telemetry_over_several_requests() {
|
||||
let _ = env_logger::init();
|
||||
|
||||
info!("running test server");
|
||||
let srv = server::new()
|
||||
.route("/hey", "hello")
|
||||
.route("/hi", "good morning")
|
||||
.run();
|
||||
|
||||
let mut ctrl = controller::new();
|
||||
let reports = ctrl.reports();
|
||||
let proxy = proxy::new()
|
||||
.controller(ctrl.run())
|
||||
.inbound(srv)
|
||||
.metrics_flush_interval(Duration::from_millis(500))
|
||||
.run();
|
||||
let client = client::new(proxy.inbound, "test.conduit.local");
|
||||
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
||||
info!("client.get(/hi)");
|
||||
assert_eq!(client.get("/hi"), "good morning");
|
||||
assert_eq!(client.get("/hi"), "good morning");
|
||||
|
||||
info!("awaiting report");
|
||||
let report = reports.wait().next().unwrap().unwrap();
|
||||
// proxy inbound
|
||||
assert_eq!(report.proxy, 0);
|
||||
// process
|
||||
assert_eq!(report.process.as_ref().unwrap().node, "");
|
||||
assert_eq!(report.process.as_ref().unwrap().scheduled_instance, "");
|
||||
assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "");
|
||||
|
||||
// requests -----------------------
|
||||
assert_eq!(report.requests.len(), 2);
|
||||
|
||||
// -- first request -----------------
|
||||
let req = &report.requests[0];
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().path, "/hey");
|
||||
assert_eq!(req.count, 1);
|
||||
assert_eq!(req.responses.len(), 1);
|
||||
// ---- response --------------------
|
||||
let res = &req.responses[0];
|
||||
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
|
||||
// response latencies should always have a length equal to the number
|
||||
// of latency buckets in the latency histogram.
|
||||
assert_eq!(
|
||||
res.response_latency_counts.len(),
|
||||
report.histogram_bucket_bounds_tenth_ms.len()
|
||||
);
|
||||
assert_eq!(res.ends.len(), 1);
|
||||
|
||||
// ------ ends ----------------------
|
||||
let ends = &res.ends[0];
|
||||
assert_eq!(ends.streams.len(), 1);
|
||||
// -------- streams -----------------
|
||||
let stream = &ends.streams[0];
|
||||
assert_eq!(stream.bytes_sent, 5);
|
||||
assert_eq!(stream.frames_sent, 1);
|
||||
|
||||
// -- second request ----------------
|
||||
let req = &report.requests[1];
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().path, "/hi");
|
||||
// repeated twice
|
||||
assert_eq!(req.count, 2);
|
||||
assert_eq!(req.responses.len(), 1);
|
||||
// ---- response -------------------
|
||||
let res = &req.responses[0];
|
||||
assert_eq!(res.ctx.as_ref().unwrap().http_status_code, 200);
|
||||
// response latencies should always have a length equal to the number
|
||||
// of latency buckets in the latency histogram.
|
||||
assert_eq!(
|
||||
res.response_latency_counts.len(),
|
||||
report.histogram_bucket_bounds_tenth_ms.len()
|
||||
);
|
||||
assert_eq!(res.ends.len(), 1);
|
||||
|
||||
// ------ ends ----------------------
|
||||
let ends = &res.ends[0];
|
||||
assert_eq!(ends.streams.len(), 2);
|
||||
|
||||
// -------- streams -----------------
|
||||
let stream = &ends.streams[0];
|
||||
assert_eq!(stream.bytes_sent, 12);
|
||||
assert_eq!(stream.frames_sent, 1);
|
||||
|
||||
}
|
||||
|
||||
// Ignore this test for now, because our method of adding latency to requests
|
||||
// (calling `thread::sleep`) is likely to be flakey, especially on CI.
|
||||
// Eventually, we can add some kind of mock timer system for simulating latency
|
||||
// more reliably, and re-enable this test.
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn records_latency_statistics() {
|
||||
let _ = env_logger::init();
|
||||
|
||||
info!("running test server");
|
||||
let srv = server::new()
|
||||
.route_with_latency("/hey", "hello", Duration::from_millis(500))
|
||||
.route_with_latency("/hi", "good morning", Duration::from_millis(40))
|
||||
.run();
|
||||
|
||||
let mut ctrl = controller::new();
|
||||
let reports = ctrl.reports();
|
||||
let proxy = proxy::new()
|
||||
.controller(ctrl.run())
|
||||
.inbound(srv)
|
||||
.metrics_flush_interval(Duration::from_secs(5))
|
||||
.run();
|
||||
let client = client::new(proxy.inbound, "test.conduit.local");
|
||||
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
||||
info!("client.get(/hi)");
|
||||
assert_eq!(client.get("/hi"), "good morning");
|
||||
assert_eq!(client.get("/hi"), "good morning");
|
||||
|
||||
info!("awaiting report");
|
||||
let report = reports.wait().next().unwrap().unwrap();
|
||||
|
||||
// requests -----------------------
|
||||
assert_eq!(report.requests.len(), 2);
|
||||
// first request
|
||||
let req = &report.requests[0];
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().path, "/hey");
|
||||
let res = &req.responses[0];
|
||||
// response latencies should always have a length equal to the number
|
||||
// of latency buckets in the latency histogram.
|
||||
assert_eq!(
|
||||
res.response_latency_counts.len(),
|
||||
report.histogram_bucket_bounds_tenth_ms.len()
|
||||
);
|
||||
for (idx, bucket) in res.response_latency_counts.iter().enumerate() {
|
||||
// 500 ms of extra latency should put us in the 500-1000
|
||||
// millisecond bucket (the 15th bucket)
|
||||
if idx == 15 {
|
||||
assert_eq!(*bucket, 1, "poorly bucketed latencies: {:?}", res.response_latency_counts);
|
||||
} else {
|
||||
assert_eq!(*bucket, 0, "poorly bucketed latencies: {:?}", res.response_latency_counts);
|
||||
}
|
||||
}
|
||||
|
||||
// second request
|
||||
let req = &report.requests.get(1).expect("second report");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().path, "/hi");
|
||||
assert_eq!(req.count, 2);
|
||||
assert_eq!(req.responses.len(), 1);
|
||||
let res = req.responses.get(0).expect("responses[0]");
|
||||
// response latencies should always have a length equal to the number
|
||||
// of latency buckets in the latency histogram.
|
||||
assert_eq!(
|
||||
res.response_latency_counts.len(),
|
||||
report.histogram_bucket_bounds_tenth_ms.len()
|
||||
);
|
||||
for (idx, bucket) in res.response_latency_counts.iter().enumerate() {
|
||||
// 40 ms of extra latency should put us in the 40-50
|
||||
// millisecond bucket (the 10th bucket)
|
||||
if idx == 9 {
|
||||
assert_eq!(*bucket, 2, "poorly bucketed latencies: {:?}", res.response_latency_counts);
|
||||
} else {
|
||||
assert_eq!(*bucket, 0, "poorly bucketed latencies: {:?}", res.response_latency_counts);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telemetry_report_errors_are_ignored() {}
|
||||
|
||||
|
|
Loading…
Reference in New Issue