interop: add ORCA test cases and functionality (#6266)

This commit is contained in:
Doug Fawley 2023-05-10 13:26:37 -07:00 committed by GitHub
parent 5e587344ee
commit b3fbd87a9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 357 additions and 5 deletions

View File

@ -17,6 +17,10 @@
*/
// Binary client is an interop client.
//
// See interop test case descriptions [here].
//
// [here]: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
package main
import (
@ -94,7 +98,9 @@ var (
custom_metadata: server will echo custom metadata;
unimplemented_method: client attempts to call unimplemented method;
unimplemented_service: client attempts to call unimplemented service;
pick_first_unary: all requests are sent to one server despite multiple servers are resolved.`)
pick_first_unary: all requests are sent to one server despite multiple servers are resolved;
orca_per_rpc: the client verifies ORCA per-RPC metrics are provided;
orca_oob: the client verifies ORCA out-of-band metrics are provided.`)
logger = grpclog.Component("interop")
)
@ -308,6 +314,12 @@ func main() {
case "channel_soak":
interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
logger.Infoln("ChannelSoak done")
case "orca_per_rpc":
interop.DoORCAPerRPCTest(tc)
logger.Infoln("ORCAPerRPC done")
case "orca_oob":
interop.DoORCAOOBTest(tc)
logger.Infoln("ORCAOOB done")
default:
logger.Fatal("Unsupported test case: ", *testCase)
}

View File

@ -18,6 +18,8 @@ require (
contrib.go.opencensus.io/exporter/stackdriver v0.13.12 // indirect
github.com/aws/aws-sdk-go v1.44.162 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 // indirect
github.com/envoyproxy/protoc-gen-validate v0.10.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect

View File

@ -638,6 +638,7 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 h1:58f1tJ1ra+zFINPlwLWvQsR9CzAKt2e+EWV2yX9oXQ4=
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -651,6 +652,7 @@ github.com/envoyproxy/go-control-plane v0.11.1-0.20230406144219-ba92d50b6596/go.
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo=
github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w=
github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8=
github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=

170
interop/orcalb.go Normal file
View File

@ -0,0 +1,170 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package interop
import (
"context"
"fmt"
"sync"
"time"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/orca"
)
func init() {
balancer.Register(orcabb{})
}
type orcabb struct{}
func (orcabb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &orcab{cc: cc}
}
func (orcabb) Name() string {
return "test_backend_metrics_load_balancer"
}
type orcab struct {
cc balancer.ClientConn
sc balancer.SubConn
cancelWatch func()
reportMu sync.Mutex
report *v3orcapb.OrcaLoadReport
}
func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
if o.sc != nil {
o.sc.UpdateAddresses(s.ResolverState.Addresses)
return nil
}
if len(s.ResolverState.Addresses) == 0 {
o.ResolverError(fmt.Errorf("produced no addresses"))
return fmt.Errorf("resolver produced no addresses")
}
var err error
o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
return nil
}
o.cancelWatch = orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
o.sc.Connect()
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
return nil
}
func (o *orcab) ResolverError(err error) {
if o.sc == nil {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("resolver error: %v", err))})
}
}
func (o *orcab) UpdateSubConnState(sc balancer.SubConn, scState balancer.SubConnState) {
if o.sc != sc {
logger.Errorf("received subconn update for unknown subconn: %v vs %v", o.sc, sc)
return
}
switch scState.ConnectivityState {
case connectivity.Ready:
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &scPicker{sc: sc, o: o}})
case connectivity.TransientFailure:
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", scState.ConnectionError))})
case connectivity.Connecting:
// Ignore; picker already set to "connecting".
case connectivity.Idle:
sc.Connect()
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
case connectivity.Shutdown:
// Ignore; we are closing but handle that in Close instead.
}
}
func (o *orcab) Close() {
o.cancelWatch()
}
func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
o.reportMu.Lock()
defer o.reportMu.Unlock()
logger.Infof("received OOB load report: %v", r)
o.report = r
}
type scPicker struct {
sc balancer.SubConn
o *orcab
}
func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
doneCB := func(di balancer.DoneInfo) {
if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
(lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
// Since all RPCs will respond with a load report due to the
// presence of the DialOption, we need to inspect every field and
// use the out-of-band report instead if all are unset/zero.
setContextCMR(info.Ctx, lr)
} else {
p.o.reportMu.Lock()
defer p.o.reportMu.Unlock()
if lr := p.o.report; lr != nil {
setContextCMR(info.Ctx, lr)
}
}
}
return balancer.PickResult{SubConn: p.sc, Done: doneCB}, nil
}
func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {
if r := orcaResultFromContext(ctx); r != nil {
*r = lr
}
}
type orcaKey string
var orcaCtxKey = orcaKey("orcaResult")
// contextWithORCAResult sets a key in ctx with a pointer to an ORCA load
// report that is to be filled in by the "test_backend_metrics_load_balancer"
// LB policy's Picker's Done callback.
//
// If a per-call load report is provided from the server for the call, result
// will be filled with that, otherwise the most recent OOB load report is used.
// If no OOB report has been received, result is not modified.
func contextWithORCAResult(ctx context.Context, result **v3orcapb.OrcaLoadReport) context.Context {
return context.WithValue(ctx, orcaCtxKey, result)
}
// orcaResultFromContext returns the ORCA load report stored in the context.
// The LB policy uses this to communicate the load report back to the interop
// client application.
func orcaResultFromContext(ctx context.Context) **v3orcapb.OrcaLoadReport {
v := ctx.Value(orcaCtxKey)
if v == nil {
return nil
}
return v.(**v3orcapb.OrcaLoadReport)
}

View File

@ -17,18 +17,25 @@
*/
// Binary server is an interop server.
//
// See interop test case descriptions [here].
//
// [here]: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
package main
import (
"flag"
"net"
"strconv"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/alts"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/interop"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/testdata"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
@ -56,7 +63,7 @@ func main() {
logger.Fatalf("failed to listen: %v", err)
}
logger.Infof("interop server listening on %v", lis.Addr())
var opts []grpc.ServerOption
opts := []grpc.ServerOption{orca.CallMetricsServerOption(nil)}
if *useTLS {
if *certFile == "" {
*certFile = testdata.Path("server1.pem")
@ -78,6 +85,13 @@ func main() {
opts = append(opts, grpc.Creds(altsTC))
}
server := grpc.NewServer(opts...)
testgrpc.RegisterTestServiceServer(server, interop.NewTestServer())
metricsRecorder := orca.NewServerMetricsRecorder()
sopts := orca.ServiceOptions{
MinReportingInterval: time.Second,
ServerMetricsProvider: metricsRecorder,
}
internal.ORCAAllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&sopts)
orca.Register(server, sopts)
testgrpc.RegisterTestServiceServer(server, interop.NewTestServer(interop.NewTestServerOptions{MetricsRecorder: metricsRecorder}))
server.Serve(lis)
}

View File

@ -17,6 +17,10 @@
*/
// Package interop contains functions used by interop client/server.
//
// See interop test case descriptions [here].
//
// [here]: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
package interop
import (
@ -36,9 +40,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
@ -772,10 +778,23 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D
type testServer struct {
testgrpc.UnimplementedTestServiceServer
metricsRecorder orca.ServerMetricsRecorder
}
// NewTestServer creates a test server for test service.
func NewTestServer() testgrpc.TestServiceServer {
// NewTestServerOptions contains options that control the behavior of the test
// server returned by NewTestServer.
type NewTestServerOptions struct {
MetricsRecorder orca.ServerMetricsRecorder
}
// NewTestServer creates a test server for test service. opts carries optional
// settings and does not need to be provided. If multiple opts are provided,
// only the first one is used.
func NewTestServer(opts ...NewTestServerOptions) testgrpc.TestServiceServer {
if len(opts) > 0 {
return &testServer{metricsRecorder: opts[0].MetricsRecorder}
}
return &testServer{}
}
@ -818,11 +837,34 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*
if err != nil {
return nil, err
}
if r, orcaData := orca.CallMetricsRecorderFromContext(ctx), in.GetOrcaPerQueryReport(); r != nil && orcaData != nil {
// Transfer the request's per-Call ORCA data to the call metrics
// recorder in the context, if present.
setORCAMetrics(r, orcaData)
}
if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil {
// Transfer the request's OOB ORCA data to the server metrics recorder
// in the server, if present.
setORCAMetrics(r, orcaData)
}
return &testpb.SimpleResponse{
Payload: pl,
}, nil
}
func setORCAMetrics(r orca.ServerMetricsRecorder, orcaData *testpb.TestOrcaReport) {
r.SetCPUUtilization(orcaData.CpuUtilization)
r.SetMemoryUtilization(orcaData.MemoryUtilization)
if rq, ok := r.(orca.CallMetricsRecorder); ok {
for k, v := range orcaData.RequestCost {
rq.SetRequestCost(k, v)
}
}
for k, v := range orcaData.Utilization {
r.SetNamedUtilization(k, v)
}
}
func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
cs := args.GetResponseParameters()
for _, c := range cs {
@ -883,6 +925,13 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe
if st != nil && st.Code != 0 {
return status.Error(codes.Code(st.Code), st.Message)
}
if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil {
// Transfer the request's OOB ORCA data to the server metrics recorder
// in the server, if present.
setORCAMetrics(r, orcaData)
}
cs := in.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
@ -933,3 +982,106 @@ func (s *testServer) HalfDuplexCall(stream testgrpc.TestService_HalfDuplexCallSe
}
return nil
}
// DoORCAPerRPCTest performs a unary RPC that enables ORCA per-call reporting
// and verifies the load report sent back to the LB policy's Done callback.
func DoORCAPerRPCTest(tc testgrpc.TestServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
orcaRes := &v3orcapb.OrcaLoadReport{}
_, err := tc.UnaryCall(contextWithORCAResult(ctx, &orcaRes), &testpb.SimpleRequest{
OrcaPerQueryReport: &testpb.TestOrcaReport{
CpuUtilization: 0.8210,
MemoryUtilization: 0.5847,
RequestCost: map[string]float64{"cost": 3456.32},
Utilization: map[string]float64{"util": 0.30499},
},
})
if err != nil {
logger.Fatalf("/TestService/UnaryCall RPC failed: ", err)
}
want := &v3orcapb.OrcaLoadReport{
CpuUtilization: 0.8210,
MemUtilization: 0.5847,
RequestCost: map[string]float64{"cost": 3456.32},
Utilization: map[string]float64{"util": 0.30499},
}
if !proto.Equal(orcaRes, want) {
logger.Fatalf("/TestService/UnaryCall RPC received ORCA load report %+v; want %+v", orcaRes, want)
}
}
// DoORCAOOBTest performs a streaming RPC that enables ORCA OOB reporting and
// verifies the load report sent to the LB policy's OOB listener.
func DoORCAOOBTest(tc testgrpc.TestServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := tc.FullDuplexCall(ctx)
if err != nil {
logger.Fatalf("/TestService/FullDuplexCall received error starting stream: %v", err)
}
err = stream.Send(&testpb.StreamingOutputCallRequest{
OrcaOobReport: &testpb.TestOrcaReport{
CpuUtilization: 0.8210,
MemoryUtilization: 0.5847,
Utilization: map[string]float64{"util": 0.30499},
},
ResponseParameters: []*testpb.ResponseParameters{{Size: 1}},
})
if err != nil {
logger.Fatalf("/TestService/FullDuplexCall received error sending: %v", err)
}
_, err = stream.Recv()
if err != nil {
logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
}
ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
want := &v3orcapb.OrcaLoadReport{
CpuUtilization: 0.8210,
MemUtilization: 0.5847,
Utilization: map[string]float64{"util": 0.30499},
}
checkORCAMetrics(ctx2, tc, want)
err = stream.Send(&testpb.StreamingOutputCallRequest{
OrcaOobReport: &testpb.TestOrcaReport{
CpuUtilization: 0.29309,
MemoryUtilization: 0.2,
Utilization: map[string]float64{"util": 0.2039},
},
ResponseParameters: []*testpb.ResponseParameters{{Size: 1}},
})
if err != nil {
logger.Fatalf("/TestService/FullDuplexCall received error sending: %v", err)
}
_, err = stream.Recv()
if err != nil {
logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
}
ctx3, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
want = &v3orcapb.OrcaLoadReport{
CpuUtilization: 0.29309,
MemUtilization: 0.2,
Utilization: map[string]float64{"util": 0.2039},
}
checkORCAMetrics(ctx3, tc, want)
}
func checkORCAMetrics(ctx context.Context, tc testgrpc.TestServiceClient, want *v3orcapb.OrcaLoadReport) {
for ctx.Err() == nil {
orcaRes := &v3orcapb.OrcaLoadReport{}
if _, err := tc.UnaryCall(contextWithORCAResult(ctx, &orcaRes), &testpb.SimpleRequest{}); err != nil {
logger.Fatalf("/TestService/UnaryCall RPC failed: ", err)
}
if proto.Equal(orcaRes, want) {
return
}
logger.Infof("/TestService/UnaryCall RPC received ORCA load report %+v; want %+v", orcaRes, want)
time.Sleep(time.Second)
}
logger.Fatalf("timed out waiting for expected ORCA load report")
}