balancer: add server loads from RPC trailers to DoneInfo (#2641)

This commit is contained in:
Menghan Li 2019-04-02 11:15:36 -07:00 committed by GitHub
parent 924457101b
commit d389f9fac6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 636 additions and 0 deletions

View File

@ -183,6 +183,11 @@ type DoneInfo struct {
BytesSent bool
// BytesReceived indicates if any byte has been received from the server.
BytesReceived bool
// ServerLoad is the load received from server. It's usually sent as part of
// trailing metadata.
//
// The only supported type now is *orca_v1.LoadReport.
ServerLoad interface{}
}
var (

View File

@ -0,0 +1,46 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package balancerload defines APIs to parse server loads in trailers. The
// parsed loads are sent to balancers in DoneInfo.
package balancerload
import (
"google.golang.org/grpc/metadata"
)
// Parser converts loads from metadata into a concrete type.
type Parser interface {
// Parse parses loads from metadata.
Parse(md metadata.MD) interface{}
}
var parser Parser
// SetParser sets the load parser.
//
// Not mutex-protected, should be called before any gRPC functions.
func SetParser(lr Parser) {
parser = lr
}
// Parse calls parser.Read().
func Parse(md metadata.MD) interface{} {
if parser == nil {
return nil
}
return parser.Parse(md)
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate protoc -I ./orca_v1 --go_out=plugins=grpc:./orca_v1 ./orca_v1/orca.proto
// Package orca implements Open Request Cost Aggregation.
package orca
import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/balancerload"
orcapb "google.golang.org/grpc/internal/balancerload/orca/orca_v1"
"google.golang.org/grpc/metadata"
)
const mdKey = "X-Endpoint-Load-Metrics-Bin"
// toBytes converts a orca load report into bytes.
func toBytes(r *orcapb.LoadReport) []byte {
if r == nil {
return nil
}
b, err := proto.Marshal(r)
if err != nil {
grpclog.Warningf("orca: failed to marshal load report: %v", err)
return nil
}
return b
}
// ToMetadata converts a orca load report into grpc metadata.
func ToMetadata(r *orcapb.LoadReport) metadata.MD {
b := toBytes(r)
if b == nil {
return nil
}
return metadata.Pairs(mdKey, string(b))
}
// fromBytes reads load report bytes and converts it to orca.
func fromBytes(b []byte) *orcapb.LoadReport {
ret := new(orcapb.LoadReport)
if err := proto.Unmarshal(b, ret); err != nil {
grpclog.Warningf("orca: failed to unmarshal load report: %v", err)
return nil
}
return ret
}
// FromMetadata reads load report from metadata and converts it to orca.
//
// It returns nil if report is not found in metadata.
func FromMetadata(md metadata.MD) *orcapb.LoadReport {
vs := md.Get(mdKey)
if len(vs) == 0 {
return nil
}
return fromBytes([]byte(vs[0]))
}
type loadParser struct{}
func (*loadParser) Parse(md metadata.MD) interface{} {
return FromMetadata(md)
}
func init() {
balancerload.SetParser(&loadParser{})
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package orca
import (
"reflect"
"strings"
"testing"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/internal/balancerload/orca/orca_v1"
"google.golang.org/grpc/metadata"
)
var (
testMessage = &orca_v1.LoadReport{
CpuUtilization: 0.1,
MemUtilization: 0.2,
NicInUtilization: 0,
NicOutUtilization: 0,
RequestCostOrUtilization: map[string]float64{"ttt": 0.4},
}
testBytes, _ = proto.Marshal(testMessage)
)
func TestToMetadata(t *testing.T) {
tests := []struct {
name string
r *orca_v1.LoadReport
want metadata.MD
}{{
name: "nil",
r: nil,
want: nil,
}, {
name: "valid",
r: testMessage,
want: metadata.MD{
strings.ToLower(mdKey): []string{string(testBytes)},
},
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ToMetadata(tt.r); !reflect.DeepEqual(got, tt.want) {
t.Errorf("ToMetadata() = %v, want %v", got, tt.want)
}
})
}
}
func TestFromMetadata(t *testing.T) {
tests := []struct {
name string
md metadata.MD
want *orca_v1.LoadReport
}{{
name: "nil",
md: nil,
want: nil,
}, {
name: "valid",
md: metadata.MD{
strings.ToLower(mdKey): []string{string(testBytes)},
},
want: testMessage,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := FromMetadata(tt.md); !proto.Equal(got, tt.want) {
t.Errorf("FromMetadata() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,293 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: orca.proto
package orca_v1
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import duration "github.com/golang/protobuf/ptypes/duration"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type LoadReport struct {
// CPU utilization expressed as a fraction of available CPU resources. This
// should be derived from a sample or measurement taken during the request.
CpuUtilization float64 `protobuf:"fixed64,1,opt,name=cpu_utilization,json=cpuUtilization,proto3" json:"cpu_utilization,omitempty"`
// Memory utilization expressed as a fraction of available memory
// resources. This should be derived from a sample or measurement taken
// during the request.
MemUtilization float64 `protobuf:"fixed64,2,opt,name=mem_utilization,json=memUtilization,proto3" json:"mem_utilization,omitempty"`
// NIC inbound/outbound utilization expressed as a fraction of available NIC
// bandwidth. The request in/out bytes can be inferred by Envoy, but not the
// NIC availability at the endpoint, hence reporting
NicInUtilization float64 `protobuf:"fixed64,3,opt,name=nic_in_utilization,json=nicInUtilization,proto3" json:"nic_in_utilization,omitempty"`
NicOutUtilization float64 `protobuf:"fixed64,4,opt,name=nic_out_utilization,json=nicOutUtilization,proto3" json:"nic_out_utilization,omitempty"`
// Application specific requests costs. Values may be absolute costs (e.g.
// 3487 bytes of storage) associated with the cost or utilization,
// expressed as a fraction of total resources available. Utilization
// metrics should be derived from a sample or measurement taken
// during the request.
RequestCostOrUtilization map[string]float64 `protobuf:"bytes,5,rep,name=request_cost_or_utilization,json=requestCostOrUtilization,proto3" json:"request_cost_or_utilization,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"fixed64,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadReport) Reset() { *m = LoadReport{} }
func (m *LoadReport) String() string { return proto.CompactTextString(m) }
func (*LoadReport) ProtoMessage() {}
func (*LoadReport) Descriptor() ([]byte, []int) {
return fileDescriptor_orca_542539e3bf435293, []int{0}
}
func (m *LoadReport) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadReport.Unmarshal(m, b)
}
func (m *LoadReport) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LoadReport.Marshal(b, m, deterministic)
}
func (dst *LoadReport) XXX_Merge(src proto.Message) {
xxx_messageInfo_LoadReport.Merge(dst, src)
}
func (m *LoadReport) XXX_Size() int {
return xxx_messageInfo_LoadReport.Size(m)
}
func (m *LoadReport) XXX_DiscardUnknown() {
xxx_messageInfo_LoadReport.DiscardUnknown(m)
}
var xxx_messageInfo_LoadReport proto.InternalMessageInfo
func (m *LoadReport) GetCpuUtilization() float64 {
if m != nil {
return m.CpuUtilization
}
return 0
}
func (m *LoadReport) GetMemUtilization() float64 {
if m != nil {
return m.MemUtilization
}
return 0
}
func (m *LoadReport) GetNicInUtilization() float64 {
if m != nil {
return m.NicInUtilization
}
return 0
}
func (m *LoadReport) GetNicOutUtilization() float64 {
if m != nil {
return m.NicOutUtilization
}
return 0
}
func (m *LoadReport) GetRequestCostOrUtilization() map[string]float64 {
if m != nil {
return m.RequestCostOrUtilization
}
return nil
}
type LoadReportRequest struct {
// Interval for generating Open RCA core metric responses.
ReportInterval *duration.Duration `protobuf:"bytes,1,opt,name=report_interval,json=reportInterval,proto3" json:"report_interval,omitempty"`
// Request costs to collect. If this is empty, all known requests costs tracked by
// the load reporting agent will be returned. This provides an opportunity for
// the client to selectively obtain a subset of tracked costs.
RequestCostNames []string `protobuf:"bytes,2,rep,name=request_cost_names,json=requestCostNames,proto3" json:"request_cost_names,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadReportRequest) Reset() { *m = LoadReportRequest{} }
func (m *LoadReportRequest) String() string { return proto.CompactTextString(m) }
func (*LoadReportRequest) ProtoMessage() {}
func (*LoadReportRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_orca_542539e3bf435293, []int{1}
}
func (m *LoadReportRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadReportRequest.Unmarshal(m, b)
}
func (m *LoadReportRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LoadReportRequest.Marshal(b, m, deterministic)
}
func (dst *LoadReportRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_LoadReportRequest.Merge(dst, src)
}
func (m *LoadReportRequest) XXX_Size() int {
return xxx_messageInfo_LoadReportRequest.Size(m)
}
func (m *LoadReportRequest) XXX_DiscardUnknown() {
xxx_messageInfo_LoadReportRequest.DiscardUnknown(m)
}
var xxx_messageInfo_LoadReportRequest proto.InternalMessageInfo
func (m *LoadReportRequest) GetReportInterval() *duration.Duration {
if m != nil {
return m.ReportInterval
}
return nil
}
func (m *LoadReportRequest) GetRequestCostNames() []string {
if m != nil {
return m.RequestCostNames
}
return nil
}
func init() {
proto.RegisterType((*LoadReport)(nil), "orca.v1.LoadReport")
proto.RegisterMapType((map[string]float64)(nil), "orca.v1.LoadReport.RequestCostOrUtilizationEntry")
proto.RegisterType((*LoadReportRequest)(nil), "orca.v1.LoadReportRequest")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// OpenRCAServiceClient is the client API for OpenRCAService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type OpenRCAServiceClient interface {
StreamCoreMetrics(ctx context.Context, in *LoadReportRequest, opts ...grpc.CallOption) (OpenRCAService_StreamCoreMetricsClient, error)
}
type openRCAServiceClient struct {
cc *grpc.ClientConn
}
func NewOpenRCAServiceClient(cc *grpc.ClientConn) OpenRCAServiceClient {
return &openRCAServiceClient{cc}
}
func (c *openRCAServiceClient) StreamCoreMetrics(ctx context.Context, in *LoadReportRequest, opts ...grpc.CallOption) (OpenRCAService_StreamCoreMetricsClient, error) {
stream, err := c.cc.NewStream(ctx, &_OpenRCAService_serviceDesc.Streams[0], "/orca.v1.OpenRCAService/StreamCoreMetrics", opts...)
if err != nil {
return nil, err
}
x := &openRCAServiceStreamCoreMetricsClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type OpenRCAService_StreamCoreMetricsClient interface {
Recv() (*LoadReport, error)
grpc.ClientStream
}
type openRCAServiceStreamCoreMetricsClient struct {
grpc.ClientStream
}
func (x *openRCAServiceStreamCoreMetricsClient) Recv() (*LoadReport, error) {
m := new(LoadReport)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// OpenRCAServiceServer is the server API for OpenRCAService service.
type OpenRCAServiceServer interface {
StreamCoreMetrics(*LoadReportRequest, OpenRCAService_StreamCoreMetricsServer) error
}
func RegisterOpenRCAServiceServer(s *grpc.Server, srv OpenRCAServiceServer) {
s.RegisterService(&_OpenRCAService_serviceDesc, srv)
}
func _OpenRCAService_StreamCoreMetrics_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(LoadReportRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(OpenRCAServiceServer).StreamCoreMetrics(m, &openRCAServiceStreamCoreMetricsServer{stream})
}
type OpenRCAService_StreamCoreMetricsServer interface {
Send(*LoadReport) error
grpc.ServerStream
}
type openRCAServiceStreamCoreMetricsServer struct {
grpc.ServerStream
}
func (x *openRCAServiceStreamCoreMetricsServer) Send(m *LoadReport) error {
return x.ServerStream.SendMsg(m)
}
var _OpenRCAService_serviceDesc = grpc.ServiceDesc{
ServiceName: "orca.v1.OpenRCAService",
HandlerType: (*OpenRCAServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamCoreMetrics",
Handler: _OpenRCAService_StreamCoreMetrics_Handler,
ServerStreams: true,
},
},
Metadata: "orca.proto",
}
func init() { proto.RegisterFile("orca.proto", fileDescriptor_orca_542539e3bf435293) }
var fileDescriptor_orca_542539e3bf435293 = []byte{
// 373 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xcd, 0x6b, 0xe3, 0x30,
0x10, 0xc5, 0xd7, 0xf6, 0x66, 0x97, 0x28, 0x90, 0x0f, 0x65, 0x0f, 0x59, 0x2f, 0xbb, 0x84, 0x5c,
0x36, 0x87, 0x45, 0xd9, 0xa4, 0x97, 0xd2, 0x5b, 0x9b, 0x16, 0x1a, 0xfa, 0x11, 0x50, 0xe8, 0xa5,
0x17, 0xe3, 0x28, 0xd3, 0x20, 0x6a, 0x4b, 0xae, 0x2c, 0x19, 0xd2, 0x7b, 0xff, 0xea, 0x5e, 0x8a,
0x65, 0x97, 0xd8, 0x90, 0xf6, 0x26, 0xbd, 0xf9, 0xbd, 0x61, 0xe6, 0x0d, 0x42, 0x52, 0xb1, 0x90,
0x24, 0x4a, 0x6a, 0x89, 0xbf, 0xdb, 0x77, 0x36, 0xf5, 0xff, 0x6c, 0xa5, 0xdc, 0x46, 0x30, 0xb1,
0xf2, 0xda, 0x3c, 0x4c, 0x36, 0x46, 0x85, 0x9a, 0x4b, 0x51, 0x80, 0xa3, 0x57, 0x17, 0xa1, 0x6b,
0x19, 0x6e, 0x28, 0x24, 0x52, 0x69, 0xfc, 0x17, 0x75, 0x58, 0x62, 0x02, 0xa3, 0x79, 0xc4, 0x9f,
0x2d, 0x37, 0x70, 0x86, 0xce, 0xd8, 0xa1, 0x6d, 0x96, 0x98, 0xbb, 0xbd, 0x9a, 0x83, 0x31, 0xc4,
0x35, 0xd0, 0x2d, 0xc0, 0x18, 0xe2, 0x2a, 0xf8, 0x0f, 0x61, 0xc1, 0x59, 0xc0, 0x45, 0x8d, 0xf5,
0x2c, 0xdb, 0x15, 0x9c, 0x2d, 0x44, 0x95, 0x26, 0xa8, 0x9f, 0xd3, 0xd2, 0xe8, 0x1a, 0xfe, 0xd5,
0xe2, 0x3d, 0xc1, 0xd9, 0xd2, 0xe8, 0x2a, 0x9f, 0xa0, 0x5f, 0x0a, 0x9e, 0x0c, 0xa4, 0x3a, 0x60,
0x32, 0xd5, 0x81, 0x54, 0x35, 0x5f, 0x63, 0xe8, 0x8d, 0x5b, 0xb3, 0x29, 0x29, 0xd3, 0x20, 0xfb,
0x4d, 0x09, 0x2d, 0x6c, 0x73, 0x99, 0xea, 0xa5, 0xaa, 0xb4, 0xbc, 0x10, 0x5a, 0xed, 0xe8, 0x40,
0x7d, 0x50, 0xf6, 0xaf, 0xd0, 0xef, 0x4f, 0xad, 0xb8, 0x8b, 0xbc, 0x47, 0xd8, 0xd9, 0xd8, 0x9a,
0x34, 0x7f, 0xe2, 0x1f, 0xa8, 0x91, 0x85, 0x91, 0x81, 0x32, 0xa1, 0xe2, 0x73, 0xe2, 0x1e, 0x3b,
0xa3, 0x17, 0x07, 0xf5, 0xf6, 0x33, 0x95, 0x7d, 0xf1, 0x19, 0xea, 0x28, 0x2b, 0x04, 0x5c, 0x68,
0x50, 0x59, 0x18, 0xd9, 0x6e, 0xad, 0xd9, 0x4f, 0x52, 0x5c, 0x93, 0xbc, 0x5f, 0x93, 0x9c, 0x97,
0xd7, 0xa4, 0xed, 0xc2, 0xb1, 0x28, 0x0d, 0x79, 0xec, 0xb5, 0x60, 0x44, 0x18, 0x43, 0x3a, 0x70,
0x87, 0xde, 0xb8, 0x49, 0xbb, 0x95, 0xe5, 0x6e, 0x73, 0x7d, 0x76, 0x8f, 0xda, 0xcb, 0x04, 0x04,
0x9d, 0x9f, 0xae, 0x40, 0x65, 0x9c, 0x01, 0xbe, 0x44, 0xbd, 0x95, 0x56, 0x10, 0xc6, 0x73, 0xa9,
0xe0, 0x06, 0xb4, 0xe2, 0x2c, 0xc5, 0xfe, 0x81, 0x20, 0xcb, 0xa1, 0xfd, 0xfe, 0x81, 0xda, 0xe8,
0xcb, 0x7f, 0x67, 0xfd, 0xcd, 0x0e, 0x7b, 0xf4, 0x16, 0x00, 0x00, 0xff, 0xff, 0xc0, 0xda, 0x2d,
0xb7, 0x9f, 0x02, 0x00, 0x00,
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
package orca.v1;
import "google/protobuf/duration.proto";
message LoadReport {
// CPU utilization expressed as a fraction of available CPU resources. This
// should be derived from a sample or measurement taken during the request.
double cpu_utilization = 1;
// Memory utilization expressed as a fraction of available memory
// resources. This should be derived from a sample or measurement taken
// during the request.
double mem_utilization = 2;
// NIC inbound/outbound utilization expressed as a fraction of available NIC
// bandwidth. The request in/out bytes can be inferred by Envoy, but not the
// NIC availability at the endpoint, hence reporting
double nic_in_utilization = 3;
double nic_out_utilization = 4;
// Application specific requests costs. Values may be absolute costs (e.g.
// 3487 bytes of storage) associated with the cost or utilization,
// expressed as a fraction of total resources available. Utilization
// metrics should be derived from a sample or measurement taken
// during the request.
map<string, double> request_cost_or_utilization = 5;
}
message LoadReportRequest {
// Interval for generating Open RCA core metric responses.
google.protobuf.Duration report_interval = 1;
// Request costs to collect. If this is empty, all known requests costs tracked by
// the load reporting agent will be returned. This provides an opportunity for
// the client to selectively obtain a subset of tracked costs.
repeated string request_cost_names = 2;
}
service OpenRCAService {
rpc StreamCoreMetrics(LoadReportRequest) returns (stream LoadReport) {
}
}

View File

@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
@ -940,6 +941,7 @@ func (a *csAttempt) finish(err error) {
Trailer: tr,
BytesSent: a.s != nil,
BytesReceived: br,
ServerLoad: balancerload.Parse(tr),
})
}
if a.statsHandler != nil {

View File

@ -24,14 +24,19 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/balancerload/orca"
orcapb "google.golang.org/grpc/internal/balancerload/orca/orca_v1"
"google.golang.org/grpc/resolver"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/testdata"
_ "google.golang.org/grpc/internal/balancerload/orca"
)
const testBalancerName = "testbalancer"
@ -194,3 +199,56 @@ func testDoneInfo(t *testing.T, e env) {
t.Fatalf("Got %d picks, %d doneInfo, want equal amount", len(b.pickOptions), len(b.doneInfo))
}
}
func (s) TestDoneLoads(t *testing.T) {
for _, e := range listTestEnv() {
testDoneLoads(t, e)
}
}
func testDoneLoads(t *testing.T, e env) {
b := &testBalancer{}
balancer.Register(b)
testLoad := &orcapb.LoadReport{
CpuUtilization: 0.31,
MemUtilization: 0.41,
NicInUtilization: 0.59,
NicOutUtilization: 0.26,
RequestCostOrUtilization: nil,
}
ss := &stubServer{
emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
grpc.SetTrailer(ctx, orca.ToMetadata(testLoad))
return &testpb.Empty{}, nil
},
}
if err := ss.Start(nil, grpc.WithBalancerName(testBalancerName)); err != nil {
t.Fatalf("error starting testing server: %v", err)
}
defer ss.Stop()
tc := testpb.NewTestServiceClient(ss.cc)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, nil)
}
poWant := []balancer.PickOptions{
{FullMethodName: "/grpc.testing.TestService/EmptyCall"},
}
if !reflect.DeepEqual(b.pickOptions, poWant) {
t.Fatalf("b.pickOptions = %v; want %v", b.pickOptions, poWant)
}
if len(b.doneInfo) < 1 {
t.Fatalf("b.doneInfo = %v, want length 1", b.doneInfo)
}
gotLoad, _ := b.doneInfo[0].ServerLoad.(*orcapb.LoadReport)
if !proto.Equal(gotLoad, testLoad) {
t.Fatalf("b.doneInfo[0].ServerLoad = %v; want = %v", b.doneInfo[0].ServerLoad, testLoad)
}
}