xds: lrs load store (#2779)

This commit is contained in:
Menghan Li 2019-05-07 10:06:40 -07:00 committed by GitHub
parent 4b60e3b6a1
commit 9949ee0c45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1306 additions and 0 deletions

View File

@ -0,0 +1,429 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: envoy/api/v2/endpoint/load_report.proto
package envoy_api_v2_endpoint
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import duration "github.com/golang/protobuf/ptypes/duration"
import _struct "github.com/golang/protobuf/ptypes/struct"
import address "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/address"
import base "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
import _ "google.golang.org/grpc/balancer/xds/internal/proto/validate"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type UpstreamLocalityStats struct {
Locality *base.Locality `protobuf:"bytes,1,opt,name=locality,proto3" json:"locality,omitempty"`
TotalSuccessfulRequests uint64 `protobuf:"varint,2,opt,name=total_successful_requests,json=totalSuccessfulRequests,proto3" json:"total_successful_requests,omitempty"`
TotalRequestsInProgress uint64 `protobuf:"varint,3,opt,name=total_requests_in_progress,json=totalRequestsInProgress,proto3" json:"total_requests_in_progress,omitempty"`
TotalErrorRequests uint64 `protobuf:"varint,4,opt,name=total_error_requests,json=totalErrorRequests,proto3" json:"total_error_requests,omitempty"`
LoadMetricStats []*EndpointLoadMetricStats `protobuf:"bytes,5,rep,name=load_metric_stats,json=loadMetricStats,proto3" json:"load_metric_stats,omitempty"`
UpstreamEndpointStats []*UpstreamEndpointStats `protobuf:"bytes,7,rep,name=upstream_endpoint_stats,json=upstreamEndpointStats,proto3" json:"upstream_endpoint_stats,omitempty"`
Priority uint32 `protobuf:"varint,6,opt,name=priority,proto3" json:"priority,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *UpstreamLocalityStats) Reset() { *m = UpstreamLocalityStats{} }
func (m *UpstreamLocalityStats) String() string { return proto.CompactTextString(m) }
func (*UpstreamLocalityStats) ProtoMessage() {}
func (*UpstreamLocalityStats) Descriptor() ([]byte, []int) {
return fileDescriptor_load_report_5485b60725c658b8, []int{0}
}
func (m *UpstreamLocalityStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_UpstreamLocalityStats.Unmarshal(m, b)
}
func (m *UpstreamLocalityStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_UpstreamLocalityStats.Marshal(b, m, deterministic)
}
func (dst *UpstreamLocalityStats) XXX_Merge(src proto.Message) {
xxx_messageInfo_UpstreamLocalityStats.Merge(dst, src)
}
func (m *UpstreamLocalityStats) XXX_Size() int {
return xxx_messageInfo_UpstreamLocalityStats.Size(m)
}
func (m *UpstreamLocalityStats) XXX_DiscardUnknown() {
xxx_messageInfo_UpstreamLocalityStats.DiscardUnknown(m)
}
var xxx_messageInfo_UpstreamLocalityStats proto.InternalMessageInfo
func (m *UpstreamLocalityStats) GetLocality() *base.Locality {
if m != nil {
return m.Locality
}
return nil
}
func (m *UpstreamLocalityStats) GetTotalSuccessfulRequests() uint64 {
if m != nil {
return m.TotalSuccessfulRequests
}
return 0
}
func (m *UpstreamLocalityStats) GetTotalRequestsInProgress() uint64 {
if m != nil {
return m.TotalRequestsInProgress
}
return 0
}
func (m *UpstreamLocalityStats) GetTotalErrorRequests() uint64 {
if m != nil {
return m.TotalErrorRequests
}
return 0
}
func (m *UpstreamLocalityStats) GetLoadMetricStats() []*EndpointLoadMetricStats {
if m != nil {
return m.LoadMetricStats
}
return nil
}
func (m *UpstreamLocalityStats) GetUpstreamEndpointStats() []*UpstreamEndpointStats {
if m != nil {
return m.UpstreamEndpointStats
}
return nil
}
func (m *UpstreamLocalityStats) GetPriority() uint32 {
if m != nil {
return m.Priority
}
return 0
}
type UpstreamEndpointStats struct {
Address *address.Address `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
Metadata *_struct.Struct `protobuf:"bytes,6,opt,name=metadata,proto3" json:"metadata,omitempty"`
TotalSuccessfulRequests uint64 `protobuf:"varint,2,opt,name=total_successful_requests,json=totalSuccessfulRequests,proto3" json:"total_successful_requests,omitempty"`
TotalRequestsInProgress uint64 `protobuf:"varint,3,opt,name=total_requests_in_progress,json=totalRequestsInProgress,proto3" json:"total_requests_in_progress,omitempty"`
TotalErrorRequests uint64 `protobuf:"varint,4,opt,name=total_error_requests,json=totalErrorRequests,proto3" json:"total_error_requests,omitempty"`
LoadMetricStats []*EndpointLoadMetricStats `protobuf:"bytes,5,rep,name=load_metric_stats,json=loadMetricStats,proto3" json:"load_metric_stats,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *UpstreamEndpointStats) Reset() { *m = UpstreamEndpointStats{} }
func (m *UpstreamEndpointStats) String() string { return proto.CompactTextString(m) }
func (*UpstreamEndpointStats) ProtoMessage() {}
func (*UpstreamEndpointStats) Descriptor() ([]byte, []int) {
return fileDescriptor_load_report_5485b60725c658b8, []int{1}
}
func (m *UpstreamEndpointStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_UpstreamEndpointStats.Unmarshal(m, b)
}
func (m *UpstreamEndpointStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_UpstreamEndpointStats.Marshal(b, m, deterministic)
}
func (dst *UpstreamEndpointStats) XXX_Merge(src proto.Message) {
xxx_messageInfo_UpstreamEndpointStats.Merge(dst, src)
}
func (m *UpstreamEndpointStats) XXX_Size() int {
return xxx_messageInfo_UpstreamEndpointStats.Size(m)
}
func (m *UpstreamEndpointStats) XXX_DiscardUnknown() {
xxx_messageInfo_UpstreamEndpointStats.DiscardUnknown(m)
}
var xxx_messageInfo_UpstreamEndpointStats proto.InternalMessageInfo
func (m *UpstreamEndpointStats) GetAddress() *address.Address {
if m != nil {
return m.Address
}
return nil
}
func (m *UpstreamEndpointStats) GetMetadata() *_struct.Struct {
if m != nil {
return m.Metadata
}
return nil
}
func (m *UpstreamEndpointStats) GetTotalSuccessfulRequests() uint64 {
if m != nil {
return m.TotalSuccessfulRequests
}
return 0
}
func (m *UpstreamEndpointStats) GetTotalRequestsInProgress() uint64 {
if m != nil {
return m.TotalRequestsInProgress
}
return 0
}
func (m *UpstreamEndpointStats) GetTotalErrorRequests() uint64 {
if m != nil {
return m.TotalErrorRequests
}
return 0
}
func (m *UpstreamEndpointStats) GetLoadMetricStats() []*EndpointLoadMetricStats {
if m != nil {
return m.LoadMetricStats
}
return nil
}
type EndpointLoadMetricStats struct {
MetricName string `protobuf:"bytes,1,opt,name=metric_name,json=metricName,proto3" json:"metric_name,omitempty"`
NumRequestsFinishedWithMetric uint64 `protobuf:"varint,2,opt,name=num_requests_finished_with_metric,json=numRequestsFinishedWithMetric,proto3" json:"num_requests_finished_with_metric,omitempty"`
TotalMetricValue float64 `protobuf:"fixed64,3,opt,name=total_metric_value,json=totalMetricValue,proto3" json:"total_metric_value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *EndpointLoadMetricStats) Reset() { *m = EndpointLoadMetricStats{} }
func (m *EndpointLoadMetricStats) String() string { return proto.CompactTextString(m) }
func (*EndpointLoadMetricStats) ProtoMessage() {}
func (*EndpointLoadMetricStats) Descriptor() ([]byte, []int) {
return fileDescriptor_load_report_5485b60725c658b8, []int{2}
}
func (m *EndpointLoadMetricStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_EndpointLoadMetricStats.Unmarshal(m, b)
}
func (m *EndpointLoadMetricStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_EndpointLoadMetricStats.Marshal(b, m, deterministic)
}
func (dst *EndpointLoadMetricStats) XXX_Merge(src proto.Message) {
xxx_messageInfo_EndpointLoadMetricStats.Merge(dst, src)
}
func (m *EndpointLoadMetricStats) XXX_Size() int {
return xxx_messageInfo_EndpointLoadMetricStats.Size(m)
}
func (m *EndpointLoadMetricStats) XXX_DiscardUnknown() {
xxx_messageInfo_EndpointLoadMetricStats.DiscardUnknown(m)
}
var xxx_messageInfo_EndpointLoadMetricStats proto.InternalMessageInfo
func (m *EndpointLoadMetricStats) GetMetricName() string {
if m != nil {
return m.MetricName
}
return ""
}
func (m *EndpointLoadMetricStats) GetNumRequestsFinishedWithMetric() uint64 {
if m != nil {
return m.NumRequestsFinishedWithMetric
}
return 0
}
func (m *EndpointLoadMetricStats) GetTotalMetricValue() float64 {
if m != nil {
return m.TotalMetricValue
}
return 0
}
type ClusterStats struct {
ClusterName string `protobuf:"bytes,1,opt,name=cluster_name,json=clusterName,proto3" json:"cluster_name,omitempty"`
ClusterServiceName string `protobuf:"bytes,6,opt,name=cluster_service_name,json=clusterServiceName,proto3" json:"cluster_service_name,omitempty"`
UpstreamLocalityStats []*UpstreamLocalityStats `protobuf:"bytes,2,rep,name=upstream_locality_stats,json=upstreamLocalityStats,proto3" json:"upstream_locality_stats,omitempty"`
TotalDroppedRequests uint64 `protobuf:"varint,3,opt,name=total_dropped_requests,json=totalDroppedRequests,proto3" json:"total_dropped_requests,omitempty"`
DroppedRequests []*ClusterStats_DroppedRequests `protobuf:"bytes,5,rep,name=dropped_requests,json=droppedRequests,proto3" json:"dropped_requests,omitempty"`
LoadReportInterval *duration.Duration `protobuf:"bytes,4,opt,name=load_report_interval,json=loadReportInterval,proto3" json:"load_report_interval,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ClusterStats) Reset() { *m = ClusterStats{} }
func (m *ClusterStats) String() string { return proto.CompactTextString(m) }
func (*ClusterStats) ProtoMessage() {}
func (*ClusterStats) Descriptor() ([]byte, []int) {
return fileDescriptor_load_report_5485b60725c658b8, []int{3}
}
func (m *ClusterStats) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ClusterStats.Unmarshal(m, b)
}
func (m *ClusterStats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ClusterStats.Marshal(b, m, deterministic)
}
func (dst *ClusterStats) XXX_Merge(src proto.Message) {
xxx_messageInfo_ClusterStats.Merge(dst, src)
}
func (m *ClusterStats) XXX_Size() int {
return xxx_messageInfo_ClusterStats.Size(m)
}
func (m *ClusterStats) XXX_DiscardUnknown() {
xxx_messageInfo_ClusterStats.DiscardUnknown(m)
}
var xxx_messageInfo_ClusterStats proto.InternalMessageInfo
func (m *ClusterStats) GetClusterName() string {
if m != nil {
return m.ClusterName
}
return ""
}
func (m *ClusterStats) GetClusterServiceName() string {
if m != nil {
return m.ClusterServiceName
}
return ""
}
func (m *ClusterStats) GetUpstreamLocalityStats() []*UpstreamLocalityStats {
if m != nil {
return m.UpstreamLocalityStats
}
return nil
}
func (m *ClusterStats) GetTotalDroppedRequests() uint64 {
if m != nil {
return m.TotalDroppedRequests
}
return 0
}
func (m *ClusterStats) GetDroppedRequests() []*ClusterStats_DroppedRequests {
if m != nil {
return m.DroppedRequests
}
return nil
}
func (m *ClusterStats) GetLoadReportInterval() *duration.Duration {
if m != nil {
return m.LoadReportInterval
}
return nil
}
type ClusterStats_DroppedRequests struct {
Category string `protobuf:"bytes,1,opt,name=category,proto3" json:"category,omitempty"`
DroppedCount uint64 `protobuf:"varint,2,opt,name=dropped_count,json=droppedCount,proto3" json:"dropped_count,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ClusterStats_DroppedRequests) Reset() { *m = ClusterStats_DroppedRequests{} }
func (m *ClusterStats_DroppedRequests) String() string { return proto.CompactTextString(m) }
func (*ClusterStats_DroppedRequests) ProtoMessage() {}
func (*ClusterStats_DroppedRequests) Descriptor() ([]byte, []int) {
return fileDescriptor_load_report_5485b60725c658b8, []int{3, 0}
}
func (m *ClusterStats_DroppedRequests) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ClusterStats_DroppedRequests.Unmarshal(m, b)
}
func (m *ClusterStats_DroppedRequests) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ClusterStats_DroppedRequests.Marshal(b, m, deterministic)
}
func (dst *ClusterStats_DroppedRequests) XXX_Merge(src proto.Message) {
xxx_messageInfo_ClusterStats_DroppedRequests.Merge(dst, src)
}
func (m *ClusterStats_DroppedRequests) XXX_Size() int {
return xxx_messageInfo_ClusterStats_DroppedRequests.Size(m)
}
func (m *ClusterStats_DroppedRequests) XXX_DiscardUnknown() {
xxx_messageInfo_ClusterStats_DroppedRequests.DiscardUnknown(m)
}
var xxx_messageInfo_ClusterStats_DroppedRequests proto.InternalMessageInfo
func (m *ClusterStats_DroppedRequests) GetCategory() string {
if m != nil {
return m.Category
}
return ""
}
func (m *ClusterStats_DroppedRequests) GetDroppedCount() uint64 {
if m != nil {
return m.DroppedCount
}
return 0
}
func init() {
proto.RegisterType((*UpstreamLocalityStats)(nil), "envoy.api.v2.endpoint.UpstreamLocalityStats")
proto.RegisterType((*UpstreamEndpointStats)(nil), "envoy.api.v2.endpoint.UpstreamEndpointStats")
proto.RegisterType((*EndpointLoadMetricStats)(nil), "envoy.api.v2.endpoint.EndpointLoadMetricStats")
proto.RegisterType((*ClusterStats)(nil), "envoy.api.v2.endpoint.ClusterStats")
proto.RegisterType((*ClusterStats_DroppedRequests)(nil), "envoy.api.v2.endpoint.ClusterStats.DroppedRequests")
}
func init() {
proto.RegisterFile("envoy/api/v2/endpoint/load_report.proto", fileDescriptor_load_report_5485b60725c658b8)
}
var fileDescriptor_load_report_5485b60725c658b8 = []byte{
// 737 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x55, 0x5b, 0x6e, 0xd3, 0x4c,
0x18, 0x95, 0x93, 0xb4, 0x4d, 0x27, 0xad, 0xd2, 0x7f, 0xd4, 0xfe, 0x49, 0xfd, 0x5f, 0x1a, 0x52,
0x21, 0xf2, 0x50, 0xd9, 0x55, 0x5a, 0x09, 0x04, 0x4f, 0xa4, 0x2d, 0xa2, 0xa2, 0xa0, 0xca, 0x11,
0x20, 0x21, 0x81, 0x35, 0xb5, 0xa7, 0xe9, 0x48, 0xb6, 0xc7, 0xcc, 0x8c, 0x0d, 0x59, 0x02, 0xaf,
0x2c, 0x81, 0x25, 0xf0, 0xc8, 0x13, 0xdb, 0x60, 0x09, 0xec, 0x02, 0x79, 0x2e, 0xce, 0xb5, 0x12,
0x0b, 0xe0, 0xcd, 0xfe, 0xce, 0x39, 0xfe, 0x6e, 0x67, 0xc6, 0xe0, 0x1e, 0x4e, 0x72, 0x3a, 0x76,
0x51, 0x4a, 0xdc, 0xbc, 0xef, 0xe2, 0x24, 0x4c, 0x29, 0x49, 0x84, 0x1b, 0x51, 0x14, 0xfa, 0x0c,
0xa7, 0x94, 0x09, 0x27, 0x65, 0x54, 0x50, 0xb8, 0x23, 0x89, 0x0e, 0x4a, 0x89, 0x93, 0xf7, 0x1d,
0x43, 0xb4, 0xf7, 0x66, 0xf4, 0x01, 0x65, 0xd8, 0x45, 0x61, 0xc8, 0x30, 0xe7, 0x4a, 0x67, 0xff,
0xbb, 0x48, 0xb8, 0x42, 0x1c, 0x6b, 0xf4, 0xff, 0x11, 0xa5, 0xa3, 0x08, 0xbb, 0xf2, 0xed, 0x2a,
0xbb, 0x76, 0xc3, 0x8c, 0x21, 0x41, 0x68, 0x62, 0xd4, 0xf3, 0x38, 0x17, 0x2c, 0x0b, 0x74, 0x4d,
0x76, 0x2b, 0x47, 0x11, 0x09, 0x91, 0xc0, 0xae, 0x79, 0x50, 0x40, 0xf7, 0x47, 0x15, 0xec, 0xbc,
0x4c, 0xb9, 0x60, 0x18, 0xc5, 0x17, 0x34, 0x40, 0x11, 0x11, 0xe3, 0xa1, 0x40, 0x82, 0xc3, 0xfb,
0xa0, 0x1e, 0xe9, 0x40, 0xdb, 0xea, 0x58, 0xbd, 0x46, 0xff, 0x1f, 0x67, 0xa6, 0xb3, 0xa2, 0x42,
0xc7, 0x68, 0xbc, 0x92, 0x0c, 0x1f, 0x82, 0x5d, 0x41, 0x05, 0x8a, 0x7c, 0x9e, 0x05, 0x01, 0xe6,
0xfc, 0x3a, 0x8b, 0x7c, 0x86, 0xdf, 0x67, 0x98, 0x0b, 0xde, 0xae, 0x74, 0xac, 0x5e, 0xcd, 0x6b,
0x49, 0xc2, 0xb0, 0xc4, 0x3d, 0x0d, 0xc3, 0x47, 0xc0, 0x56, 0x5a, 0x23, 0xf0, 0x49, 0xe2, 0xa7,
0x8c, 0x8e, 0x8a, 0x39, 0xb5, 0xab, 0x53, 0x62, 0x23, 0x39, 0x4f, 0x2e, 0x35, 0x0c, 0x0f, 0xc1,
0xb6, 0x12, 0x63, 0xc6, 0x28, 0x9b, 0xe4, 0xac, 0x49, 0x19, 0x94, 0xd8, 0x59, 0x01, 0x95, 0xe9,
0xde, 0x80, 0xbf, 0xe4, 0xfe, 0x62, 0x2c, 0x18, 0x09, 0x7c, 0x5e, 0x34, 0xde, 0x5e, 0xe9, 0x54,
0x7b, 0x8d, 0xbe, 0xe3, 0x2c, 0x5d, 0xa3, 0x73, 0xa6, 0x1f, 0x2e, 0x28, 0x0a, 0x9f, 0x4b, 0x99,
0x1c, 0x97, 0xd7, 0x8c, 0x66, 0x03, 0x30, 0x04, 0xad, 0x4c, 0x0f, 0xd6, 0x37, 0x6a, 0x9d, 0x61,
0x4d, 0x66, 0x38, 0xb8, 0x25, 0x83, 0x59, 0x87, 0xc9, 0xa4, 0xbe, 0xbf, 0x93, 0x2d, 0x0b, 0x43,
0x1b, 0xd4, 0x53, 0x46, 0x28, 0x2b, 0xb6, 0xb4, 0xda, 0xb1, 0x7a, 0x9b, 0x5e, 0xf9, 0xde, 0xfd,
0x34, 0xb5, 0xdb, 0x59, 0xd5, 0x31, 0x58, 0xd3, 0xde, 0xd3, 0xab, 0xb5, 0x97, 0xac, 0xf6, 0xb1,
0x62, 0x78, 0x86, 0x0a, 0x8f, 0x40, 0x3d, 0xc6, 0x02, 0x85, 0x48, 0x20, 0x99, 0xab, 0xd1, 0x6f,
0x39, 0xca, 0x75, 0x8e, 0x71, 0x9d, 0x33, 0x94, 0xae, 0xf3, 0x4a, 0xe2, 0x1f, 0x37, 0xc8, 0x40,
0xf7, 0xab, 0x05, 0x5a, 0xb7, 0x90, 0xe1, 0x1e, 0x68, 0xe8, 0x94, 0x09, 0x8a, 0xb1, 0xdc, 0xc8,
0xba, 0x07, 0x54, 0xe8, 0x05, 0x8a, 0x31, 0x7c, 0x0a, 0xee, 0x24, 0x59, 0x3c, 0x99, 0xc2, 0x35,
0x49, 0x08, 0xbf, 0xc1, 0xa1, 0xff, 0x81, 0x88, 0x1b, 0x5d, 0xae, 0x9e, 0xe5, 0x7f, 0x49, 0x16,
0x9b, 0x86, 0x9e, 0x68, 0xda, 0x6b, 0x22, 0x6e, 0x54, 0x3e, 0x78, 0x00, 0x54, 0xe3, 0xa6, 0xc7,
0x1c, 0x45, 0x19, 0x96, 0x93, 0xb4, 0xbc, 0x2d, 0x89, 0x28, 0xe2, 0xab, 0x22, 0xde, 0xfd, 0x52,
0x03, 0x1b, 0x27, 0x51, 0xc6, 0x05, 0x66, 0xaa, 0xd2, 0x03, 0xb0, 0x11, 0xa8, 0xf7, 0xa9, 0x52,
0x07, 0xeb, 0xdf, 0x7e, 0x7e, 0xaf, 0xd6, 0x58, 0xa5, 0x63, 0x79, 0x0d, 0x0d, 0xcb, 0xb2, 0x0f,
0xc1, 0xb6, 0x61, 0x73, 0xcc, 0x72, 0x12, 0x60, 0xa5, 0x5a, 0x95, 0x0d, 0x42, 0x8d, 0x0d, 0x15,
0x24, 0x15, 0xe9, 0xd4, 0x99, 0x31, 0xf7, 0x89, 0xde, 0x43, 0xe5, 0xb7, 0xce, 0xcc, 0xcc, 0x15,
0x36, 0x00, 0x45, 0x61, 0x2b, 0x9f, 0xad, 0x4a, 0xdd, 0x9a, 0x9c, 0x9f, 0xd9, 0x5b, 0xee, 0x18,
0xfc, 0xad, 0x06, 0x12, 0x32, 0x9a, 0xa6, 0x38, 0x9c, 0xf8, 0x44, 0xd9, 0x4b, 0x79, 0xe8, 0x54,
0x81, 0xa5, 0x53, 0xde, 0x81, 0xad, 0x05, 0xbe, 0x32, 0xca, 0xd1, 0x2d, 0x05, 0x4e, 0x8f, 0xd1,
0x99, 0xfb, 0x9c, 0xd7, 0x0c, 0xe7, 0xbe, 0xff, 0x0c, 0x6c, 0x4f, 0xfd, 0x57, 0x7c, 0x92, 0x08,
0xcc, 0x72, 0x14, 0x49, 0xef, 0x36, 0xfa, 0xbb, 0x0b, 0xa7, 0xee, 0x54, 0xff, 0x0b, 0x3c, 0x58,
0xc8, 0x3c, 0xa9, 0x3a, 0xd7, 0x22, 0xfb, 0x2d, 0x68, 0xce, 0xd7, 0x7f, 0x17, 0xd4, 0x03, 0x24,
0xf0, 0x88, 0xb2, 0xf1, 0xe2, 0x0e, 0x4b, 0x08, 0xee, 0x83, 0x4d, 0xd3, 0x66, 0x40, 0xb3, 0x44,
0x68, 0x8f, 0x6d, 0xe8, 0xe0, 0x49, 0x11, 0x1b, 0x3c, 0x00, 0xfb, 0x84, 0xaa, 0xae, 0x53, 0x46,
0x3f, 0x8e, 0x97, 0x0f, 0x60, 0xd0, 0xbc, 0x28, 0x2b, 0xbb, 0x2c, 0xca, 0xbe, 0xb4, 0xae, 0x56,
0x65, 0xfd, 0x47, 0xbf, 0x02, 0x00, 0x00, 0xff, 0xff, 0xad, 0x11, 0xfc, 0x18, 0x5a, 0x07, 0x00,
0x00,
}

View File

@ -0,0 +1,272 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: envoy/service/load_stats/v2/lrs.proto
package v2
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import duration "github.com/golang/protobuf/ptypes/duration"
import base "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
import load_report "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/load_report"
import _ "google.golang.org/grpc/balancer/xds/internal/proto/validate"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type LoadStatsRequest struct {
Node *base.Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"`
ClusterStats []*load_report.ClusterStats `protobuf:"bytes,2,rep,name=cluster_stats,json=clusterStats,proto3" json:"cluster_stats,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadStatsRequest) Reset() { *m = LoadStatsRequest{} }
func (m *LoadStatsRequest) String() string { return proto.CompactTextString(m) }
func (*LoadStatsRequest) ProtoMessage() {}
func (*LoadStatsRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_lrs_0f8f2c1d40a1b9f7, []int{0}
}
func (m *LoadStatsRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadStatsRequest.Unmarshal(m, b)
}
func (m *LoadStatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LoadStatsRequest.Marshal(b, m, deterministic)
}
func (dst *LoadStatsRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_LoadStatsRequest.Merge(dst, src)
}
func (m *LoadStatsRequest) XXX_Size() int {
return xxx_messageInfo_LoadStatsRequest.Size(m)
}
func (m *LoadStatsRequest) XXX_DiscardUnknown() {
xxx_messageInfo_LoadStatsRequest.DiscardUnknown(m)
}
var xxx_messageInfo_LoadStatsRequest proto.InternalMessageInfo
func (m *LoadStatsRequest) GetNode() *base.Node {
if m != nil {
return m.Node
}
return nil
}
func (m *LoadStatsRequest) GetClusterStats() []*load_report.ClusterStats {
if m != nil {
return m.ClusterStats
}
return nil
}
type LoadStatsResponse struct {
Clusters []string `protobuf:"bytes,1,rep,name=clusters,proto3" json:"clusters,omitempty"`
LoadReportingInterval *duration.Duration `protobuf:"bytes,2,opt,name=load_reporting_interval,json=loadReportingInterval,proto3" json:"load_reporting_interval,omitempty"`
ReportEndpointGranularity bool `protobuf:"varint,3,opt,name=report_endpoint_granularity,json=reportEndpointGranularity,proto3" json:"report_endpoint_granularity,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadStatsResponse) Reset() { *m = LoadStatsResponse{} }
func (m *LoadStatsResponse) String() string { return proto.CompactTextString(m) }
func (*LoadStatsResponse) ProtoMessage() {}
func (*LoadStatsResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_lrs_0f8f2c1d40a1b9f7, []int{1}
}
func (m *LoadStatsResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadStatsResponse.Unmarshal(m, b)
}
func (m *LoadStatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LoadStatsResponse.Marshal(b, m, deterministic)
}
func (dst *LoadStatsResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_LoadStatsResponse.Merge(dst, src)
}
func (m *LoadStatsResponse) XXX_Size() int {
return xxx_messageInfo_LoadStatsResponse.Size(m)
}
func (m *LoadStatsResponse) XXX_DiscardUnknown() {
xxx_messageInfo_LoadStatsResponse.DiscardUnknown(m)
}
var xxx_messageInfo_LoadStatsResponse proto.InternalMessageInfo
func (m *LoadStatsResponse) GetClusters() []string {
if m != nil {
return m.Clusters
}
return nil
}
func (m *LoadStatsResponse) GetLoadReportingInterval() *duration.Duration {
if m != nil {
return m.LoadReportingInterval
}
return nil
}
func (m *LoadStatsResponse) GetReportEndpointGranularity() bool {
if m != nil {
return m.ReportEndpointGranularity
}
return false
}
func init() {
proto.RegisterType((*LoadStatsRequest)(nil), "envoy.service.load_stats.v2.LoadStatsRequest")
proto.RegisterType((*LoadStatsResponse)(nil), "envoy.service.load_stats.v2.LoadStatsResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// LoadReportingServiceClient is the client API for LoadReportingService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type LoadReportingServiceClient interface {
StreamLoadStats(ctx context.Context, opts ...grpc.CallOption) (LoadReportingService_StreamLoadStatsClient, error)
}
type loadReportingServiceClient struct {
cc *grpc.ClientConn
}
func NewLoadReportingServiceClient(cc *grpc.ClientConn) LoadReportingServiceClient {
return &loadReportingServiceClient{cc}
}
func (c *loadReportingServiceClient) StreamLoadStats(ctx context.Context, opts ...grpc.CallOption) (LoadReportingService_StreamLoadStatsClient, error) {
stream, err := c.cc.NewStream(ctx, &_LoadReportingService_serviceDesc.Streams[0], "/envoy.service.load_stats.v2.LoadReportingService/StreamLoadStats", opts...)
if err != nil {
return nil, err
}
x := &loadReportingServiceStreamLoadStatsClient{stream}
return x, nil
}
type LoadReportingService_StreamLoadStatsClient interface {
Send(*LoadStatsRequest) error
Recv() (*LoadStatsResponse, error)
grpc.ClientStream
}
type loadReportingServiceStreamLoadStatsClient struct {
grpc.ClientStream
}
func (x *loadReportingServiceStreamLoadStatsClient) Send(m *LoadStatsRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *loadReportingServiceStreamLoadStatsClient) Recv() (*LoadStatsResponse, error) {
m := new(LoadStatsResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// LoadReportingServiceServer is the server API for LoadReportingService service.
type LoadReportingServiceServer interface {
StreamLoadStats(LoadReportingService_StreamLoadStatsServer) error
}
func RegisterLoadReportingServiceServer(s *grpc.Server, srv LoadReportingServiceServer) {
s.RegisterService(&_LoadReportingService_serviceDesc, srv)
}
func _LoadReportingService_StreamLoadStats_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(LoadReportingServiceServer).StreamLoadStats(&loadReportingServiceStreamLoadStatsServer{stream})
}
type LoadReportingService_StreamLoadStatsServer interface {
Send(*LoadStatsResponse) error
Recv() (*LoadStatsRequest, error)
grpc.ServerStream
}
type loadReportingServiceStreamLoadStatsServer struct {
grpc.ServerStream
}
func (x *loadReportingServiceStreamLoadStatsServer) Send(m *LoadStatsResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *loadReportingServiceStreamLoadStatsServer) Recv() (*LoadStatsRequest, error) {
m := new(LoadStatsRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _LoadReportingService_serviceDesc = grpc.ServiceDesc{
ServiceName: "envoy.service.load_stats.v2.LoadReportingService",
HandlerType: (*LoadReportingServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamLoadStats",
Handler: _LoadReportingService_StreamLoadStats_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "envoy/service/load_stats/v2/lrs.proto",
}
func init() {
proto.RegisterFile("envoy/service/load_stats/v2/lrs.proto", fileDescriptor_lrs_0f8f2c1d40a1b9f7)
}
var fileDescriptor_lrs_0f8f2c1d40a1b9f7 = []byte{
// 429 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x41, 0x8e, 0xd3, 0x30,
0x14, 0x86, 0x71, 0x0b, 0xa8, 0x78, 0x40, 0x80, 0x05, 0x6a, 0xa6, 0x83, 0x50, 0x55, 0x04, 0x04,
0x21, 0x6c, 0x14, 0xf6, 0xb3, 0x28, 0x20, 0x40, 0xaa, 0xd0, 0x90, 0xee, 0xd8, 0x54, 0x6e, 0xf2,
0x88, 0x2c, 0x05, 0xbf, 0x60, 0x3b, 0x16, 0xbd, 0x01, 0x6c, 0x58, 0x70, 0x1c, 0x56, 0x9c, 0x80,
0x7b, 0x70, 0x0b, 0x94, 0x38, 0x99, 0xc9, 0xb0, 0xa8, 0x66, 0x17, 0xeb, 0x7d, 0xff, 0xf3, 0xff,
0xff, 0x31, 0x7d, 0x08, 0xda, 0xe3, 0x4e, 0x58, 0x30, 0x5e, 0x65, 0x20, 0x4a, 0x94, 0xf9, 0xc6,
0x3a, 0xe9, 0xac, 0xf0, 0x89, 0x28, 0x8d, 0xe5, 0x95, 0x41, 0x87, 0xec, 0xa8, 0xc5, 0x78, 0x87,
0xf1, 0x33, 0x8c, 0xfb, 0x64, 0x76, 0x2f, 0xec, 0x90, 0x95, 0x6a, 0x44, 0x19, 0x1a, 0x10, 0x5b,
0x69, 0x21, 0x48, 0x67, 0x8f, 0xcf, 0x4d, 0x41, 0xe7, 0x15, 0x2a, 0xed, 0xc2, 0x4d, 0x06, 0x2a,
0x34, 0xae, 0x03, 0xef, 0x17, 0x88, 0x45, 0x09, 0xa2, 0x3d, 0x6d, 0xeb, 0x4f, 0x22, 0xaf, 0x8d,
0x74, 0x0a, 0x75, 0x37, 0x9f, 0x7a, 0x59, 0xaa, 0x5c, 0x3a, 0x10, 0xfd, 0x47, 0x18, 0x2c, 0xbe,
0x13, 0x7a, 0x6b, 0x85, 0x32, 0x5f, 0x37, 0x86, 0x52, 0xf8, 0x52, 0x83, 0x75, 0xec, 0x29, 0xbd,
0xac, 0x31, 0x87, 0x88, 0xcc, 0x49, 0x7c, 0x90, 0x4c, 0x79, 0x08, 0x20, 0x2b, 0xc5, 0x7d, 0xc2,
0x1b, 0x8f, 0xfc, 0x3d, 0xe6, 0x90, 0xb6, 0x10, 0x7b, 0x4b, 0x6f, 0x64, 0x65, 0x6d, 0x1d, 0x98,
0x90, 0x2a, 0x1a, 0xcd, 0xc7, 0xf1, 0x41, 0xf2, 0xe0, 0xbc, 0xaa, 0xf7, 0xce, 0x5f, 0x06, 0x36,
0xdc, 0x77, 0x3d, 0x1b, 0x9c, 0x16, 0x7f, 0x08, 0xbd, 0x3d, 0xf0, 0x62, 0x2b, 0xd4, 0x16, 0xd8,
0x23, 0x3a, 0xe9, 0x28, 0x1b, 0x91, 0xf9, 0x38, 0xbe, 0xb6, 0xa4, 0xbf, 0xfe, 0xfe, 0x1e, 0x5f,
0xf9, 0x49, 0x46, 0x13, 0x92, 0x9e, 0xce, 0xd8, 0x07, 0x3a, 0x1d, 0xf4, 0xa2, 0x74, 0xb1, 0x51,
0xda, 0x81, 0xf1, 0xb2, 0x8c, 0x46, 0x6d, 0x8e, 0x43, 0x1e, 0x4a, 0xe2, 0x7d, 0x49, 0xfc, 0x55,
0x57, 0x52, 0x7a, 0xb7, 0x51, 0xa6, 0xbd, 0xf0, 0x5d, 0xa7, 0x63, 0xc7, 0xf4, 0x28, 0x6c, 0xdb,
0xf4, 0xf6, 0x37, 0x85, 0x91, 0xba, 0x2e, 0xa5, 0x51, 0x6e, 0x17, 0x8d, 0xe7, 0x24, 0x9e, 0xa4,
0x87, 0x01, 0x79, 0xdd, 0x11, 0x6f, 0xce, 0x80, 0xe4, 0x07, 0xa1, 0x77, 0x56, 0xc3, 0xcd, 0xeb,
0xf0, 0x06, 0x98, 0xa7, 0x37, 0xd7, 0xce, 0x80, 0xfc, 0x7c, 0x1a, 0x97, 0x3d, 0xe3, 0x7b, 0x9e,
0x09, 0xff, 0xff, 0x17, 0xcd, 0xf8, 0x45, 0xf1, 0xd0, 0xe2, 0xe2, 0x52, 0x4c, 0x9e, 0x93, 0xe5,
0x31, 0x7d, 0xa2, 0x30, 0x28, 0x2b, 0x83, 0x5f, 0x77, 0xfb, 0x96, 0x2c, 0x27, 0x2b, 0x63, 0x4f,
0x9a, 0xaa, 0x4e, 0xc8, 0xc7, 0x91, 0x4f, 0xbe, 0x11, 0xb2, 0xbd, 0xda, 0x56, 0xf7, 0xe2, 0x5f,
0x00, 0x00, 0x00, 0xff, 0xff, 0x44, 0xa2, 0xf5, 0xb3, 0xfa, 0x02, 0x00, 0x00,
}

200
balancer/xds/lrs/lrs.go Normal file
View File

@ -0,0 +1,200 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lrs
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
loadreportpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/load_report"
lrspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
)
// lrsStore collects loads from xds balancer, and periodically sends load to the
// server.
type lrsStore struct {
serviceName string
node *basepb.Node
backoff backoff.Strategy
lastReported time.Time
drops sync.Map // map[string]*uint64
}
const grpcHostname = "com.googleapis.trafficdirector.grpc_hostname"
func newStore(serviceName string) *lrsStore {
return &lrsStore{
serviceName: serviceName,
node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
grpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: serviceName},
},
},
},
},
backoff: backoff.Exponential{
MaxDelay: 120 * time.Second,
},
lastReported: time.Now(),
}
}
// Update functions are called by picker for each RPC. To avoid contention, all
// updates are done atomically.
func (ls *lrsStore) callDropped(category string) {
p, ok := ls.drops.Load(category)
if !ok {
tp := new(uint64)
p, _ = ls.drops.LoadOrStore(category, tp)
}
atomic.AddUint64(p.(*uint64), 1)
}
// TODO: add query counts
// callStarted(l locality)
// callFinished(l locality, err error)
func (ls *lrsStore) buildStats() []*loadreportpb.ClusterStats {
var (
totalDropped uint64
droppedReqs []*loadreportpb.ClusterStats_DroppedRequests
)
ls.drops.Range(func(category, countP interface{}) bool {
tempCount := atomic.SwapUint64(countP.(*uint64), 0)
if tempCount <= 0 {
return true
}
totalDropped += tempCount
droppedReqs = append(droppedReqs, &loadreportpb.ClusterStats_DroppedRequests{
Category: category.(string),
DroppedCount: tempCount,
})
return true
})
dur := time.Since(ls.lastReported)
ls.lastReported = time.Now()
var ret []*loadreportpb.ClusterStats
ret = append(ret, &loadreportpb.ClusterStats{
ClusterName: ls.serviceName,
UpstreamLocalityStats: nil, // TODO: populate this to support per locality loads.
TotalDroppedRequests: totalDropped,
DroppedRequests: droppedReqs,
LoadReportInterval: ptypes.DurationProto(dur),
})
return ret
}
// reportTo makes a streaming lrs call to cc and blocks.
//
// It retries the call (with backoff) until ctx is canceled.
func (ls *lrsStore) reportTo(ctx context.Context, cc *grpc.ClientConn) {
c := lrspb.NewLoadReportingServiceClient(cc)
var (
retryCount int
doBackoff bool
)
for {
select {
case <-ctx.Done():
return
default:
}
if doBackoff {
backoffTimer := time.NewTimer(ls.backoff.Backoff(retryCount))
select {
case <-backoffTimer.C:
case <-ctx.Done():
backoffTimer.Stop()
return
}
retryCount++
}
doBackoff = true
stream, err := c.StreamLoadStats(ctx)
if err != nil {
grpclog.Infof("lrs: failed to create stream: %v", err)
continue
}
if err := stream.Send(&lrspb.LoadStatsRequest{
Node: ls.node,
}); err != nil {
grpclog.Infof("lrs: failed to send first request: %v", err)
continue
}
first, err := stream.Recv()
if err != nil {
grpclog.Infof("lrs: failed to receive first response: %v", err)
continue
}
interval, err := ptypes.Duration(first.LoadReportingInterval)
if err != nil {
grpclog.Infof("lrs: failed to convert report interval: %v", err)
continue
}
if len(first.Clusters) != 1 || first.Clusters[0] != ls.serviceName {
grpclog.Infof("lrs: received clusters %v, expect one cluster %q", first.Clusters, ls.serviceName)
continue
}
if first.ReportEndpointGranularity {
// TODO: fixme to support per endpoint loads.
grpclog.Infof("lrs: endpoint loads requested, but not supported by current implementation")
continue
}
// No backoff afterwards.
doBackoff = false
retryCount = 0
ls.sendLoads(ctx, stream, first.Clusters[0], interval)
}
}
func (ls *lrsStore) sendLoads(ctx context.Context, stream lrspb.LoadReportingService_StreamLoadStatsClient, clusterName string, interval time.Duration) {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
case <-ctx.Done():
return
}
if err := stream.Send(&lrspb.LoadStatsRequest{
Node: ls.node,
ClusterStats: ls.buildStats(),
}); err != nil {
grpclog.Infof("lrs: failed to send report: %v", err)
return
}
}
}

View File

@ -0,0 +1,240 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lrs
import (
"context"
"io"
"net"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
durationpb "github.com/golang/protobuf/ptypes/duration"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
loadreportpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/load_report"
lrspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const testService = "grpc.service.test"
var dropCategories = []string{"drop_for_real", "drop_for_fun"}
// equalClusterStats sorts requests and clear report internal before comparing.
func equalClusterStats(a, b []*loadreportpb.ClusterStats) bool {
for _, s := range a {
sort.Slice(s.DroppedRequests, func(i, j int) bool {
return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category
})
s.LoadReportInterval = nil
}
for _, s := range b {
sort.Slice(s.DroppedRequests, func(i, j int) bool {
return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category
})
s.LoadReportInterval = nil
}
return reflect.DeepEqual(a, b)
}
func Test_lrsStore_buildStats(t *testing.T) {
tests := []struct {
name string
drops []map[string]uint64
}{
{
name: "one report",
drops: []map[string]uint64{{
dropCategories[0]: 31,
dropCategories[1]: 41,
}},
},
{
name: "two reports",
drops: []map[string]uint64{{
dropCategories[0]: 31,
dropCategories[1]: 41,
}, {
dropCategories[0]: 59,
dropCategories[1]: 26,
}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ls := newStore(testService)
for _, ds := range tt.drops {
var (
totalDropped uint64
droppedReqs []*loadreportpb.ClusterStats_DroppedRequests
)
for cat, count := range ds {
totalDropped += count
droppedReqs = append(droppedReqs, &loadreportpb.ClusterStats_DroppedRequests{
Category: cat,
DroppedCount: count,
})
}
want := []*loadreportpb.ClusterStats{
{
ClusterName: testService,
TotalDroppedRequests: totalDropped,
DroppedRequests: droppedReqs,
},
}
var wg sync.WaitGroup
for c, count := range ds {
for i := 0; i < int(count); i++ {
wg.Add(1)
go func(i int, c string) {
ls.callDropped(c)
wg.Done()
}(i, c)
}
}
wg.Wait()
if got := ls.buildStats(); !equalClusterStats(got, want) {
t.Errorf("lrsStore.buildStats() = %v, want %v", got, want)
t.Errorf("%s", cmp.Diff(got, want))
}
}
})
}
}
type lrsServer struct {
mu sync.Mutex
dropTotal uint64
drops map[string]uint64
reportingInterval *durationpb.Duration
}
func (lrss *lrsServer) StreamLoadStats(stream lrspb.LoadReportingService_StreamLoadStatsServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
if !proto.Equal(req, &lrspb.LoadStatsRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
grpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: testService},
},
},
},
},
}) {
return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req)
}
if err := stream.Send(&lrspb.LoadStatsResponse{
Clusters: []string{testService},
LoadReportingInterval: lrss.reportingInterval,
}); err != nil {
return err
}
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
stats := req.ClusterStats[0]
lrss.mu.Lock()
lrss.dropTotal += stats.TotalDroppedRequests
for _, d := range stats.DroppedRequests {
lrss.drops[d.Category] += d.DroppedCount
}
lrss.mu.Unlock()
}
}
func setupServer(t *testing.T, reportingInterval *durationpb.Duration) (addr string, lrss *lrsServer, cleanup func()) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("listen failed due to: %v", err)
}
svr := grpc.NewServer()
lrss = &lrsServer{
drops: make(map[string]uint64),
reportingInterval: reportingInterval,
}
lrspb.RegisterLoadReportingServiceServer(svr, lrss)
go svr.Serve(lis)
return lis.Addr().String(), lrss, func() {
svr.Stop()
lis.Close()
}
}
func Test_lrsStore_reportTo(t *testing.T) {
const intervalNano = 1000 * 1000 * 50
addr, lrss, cleanup := setupServer(t, &durationpb.Duration{
Seconds: 0,
Nanos: intervalNano,
})
defer cleanup()
ls := newStore(testService)
cc, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
ls.reportTo(ctx, cc)
close(done)
}()
drops := map[string]uint64{
dropCategories[0]: 31,
dropCategories[1]: 41,
}
for c, d := range drops {
for i := 0; i < int(d); i++ {
ls.callDropped(c)
time.Sleep(time.Nanosecond * intervalNano / 10)
}
}
time.Sleep(time.Nanosecond * intervalNano * 2)
cancel()
<-done
lrss.mu.Lock()
defer lrss.mu.Unlock()
if !cmp.Equal(lrss.drops, drops) {
t.Errorf("different: %v", cmp.Diff(lrss.drops, drops))
}
}

View File

@ -0,0 +1,165 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package primitives
import (
"sync"
"sync/atomic"
"testing"
)
type incrementUint64Map interface {
increment(string)
result(string) uint64
}
type mapWithLock struct {
mu sync.Mutex
m map[string]uint64
}
func newMapWithLock() incrementUint64Map {
return &mapWithLock{
m: make(map[string]uint64),
}
}
func (mwl *mapWithLock) increment(c string) {
mwl.mu.Lock()
mwl.m[c]++
mwl.mu.Unlock()
}
func (mwl *mapWithLock) result(c string) uint64 {
return mwl.m[c]
}
type mapWithAtomicFastpath struct {
mu sync.RWMutex
m map[string]*uint64
}
func newMapWithAtomicFastpath() incrementUint64Map {
return &mapWithAtomicFastpath{
m: make(map[string]*uint64),
}
}
func (mwaf *mapWithAtomicFastpath) increment(c string) {
mwaf.mu.RLock()
if p, ok := mwaf.m[c]; ok {
atomic.AddUint64(p, 1)
mwaf.mu.RUnlock()
return
}
mwaf.mu.RUnlock()
mwaf.mu.Lock()
if p, ok := mwaf.m[c]; ok {
atomic.AddUint64(p, 1)
mwaf.mu.Unlock()
return
}
var temp uint64 = 1
mwaf.m[c] = &temp
mwaf.mu.Unlock()
}
func (mwaf *mapWithAtomicFastpath) result(c string) uint64 {
return atomic.LoadUint64(mwaf.m[c])
}
type mapWithSyncMap struct {
m sync.Map
}
func newMapWithSyncMap() incrementUint64Map {
return &mapWithSyncMap{}
}
func (mwsm *mapWithSyncMap) increment(c string) {
p, ok := mwsm.m.Load(c)
if !ok {
tp := new(uint64)
p, _ = mwsm.m.LoadOrStore(c, tp)
}
atomic.AddUint64(p.(*uint64), 1)
}
func (mwsm *mapWithSyncMap) result(c string) uint64 {
p, _ := mwsm.m.Load(c)
return atomic.LoadUint64(p.(*uint64))
}
func benchmarkIncrementUint64Map(b *testing.B, f func() incrementUint64Map) {
const cat = "cat"
benches := []struct {
name string
goroutineCount int
}{
{
name: " 1",
goroutineCount: 1,
},
{
name: " 10",
goroutineCount: 10,
},
{
name: " 100",
goroutineCount: 100,
},
{
name: "1000",
goroutineCount: 1000,
},
}
for _, bb := range benches {
b.Run(bb.name, func(b *testing.B) {
m := f()
var wg sync.WaitGroup
wg.Add(bb.goroutineCount)
b.ResetTimer()
for i := 0; i < bb.goroutineCount; i++ {
go func() {
for j := 0; j < b.N; j++ {
m.increment(cat)
}
wg.Done()
}()
}
wg.Wait()
b.StopTimer()
if m.result(cat) != uint64(bb.goroutineCount*b.N) {
b.Fatalf("result is %d, want %d", m.result(cat), b.N)
}
})
}
}
func BenchmarkMapWithSyncMutexContetion(b *testing.B) {
benchmarkIncrementUint64Map(b, newMapWithLock)
}
func BenchmarkMapWithAtomicFastpath(b *testing.B) {
benchmarkIncrementUint64Map(b, newMapWithAtomicFastpath)
}
func BenchmarkMapWithSyncMap(b *testing.B) {
benchmarkIncrementUint64Map(b, newMapWithSyncMap)
}