diff --git a/interop/interop_test.sh b/interop/interop_test.sh index 99e12c3c3..65e12a16b 100755 --- a/interop/interop_test.sh +++ b/interop/interop_test.sh @@ -45,6 +45,20 @@ pass () { echo "$(tput setaf 2) $(date): $1 $(tput sgr 0)" } +withTimeout () { + timer=$1 + shift + + # Run command in the background. + cmd=$(printf '%q ' "$@") + eval "$cmd" & + wpid=$! + # Kill after 20 seconds. + sleep $timer && kill $wpid & + # Wait for the background thread. + wait $wpid +} + # Don't run some tests that need a special environment: # "google_default_credentials" # "compute_engine_channel_credentials" @@ -70,6 +84,8 @@ CASES=( "custom_metadata" "unimplemented_method" "unimplemented_service" + "orca_per_rpc" + "orca_oob" ) # Build server @@ -96,7 +112,12 @@ for case in ${CASES[@]}; do echo "$(tput setaf 4) $(date): testing: ${case} $(tput sgr 0)" CLIENT_LOG="$(mktemp)" - if ! GRPC_GO_LOG_SEVERITY_LEVEL=info timeout 20 go run ./interop/client --use_tls --server_host_override=foo.test.google.fr --use_test_ca --test_case="${case}" &> $CLIENT_LOG; then + if ! GRPC_GO_LOG_SEVERITY_LEVEL=info withTimeout 20 go run ./interop/client \ + --use_tls \ + --server_host_override=foo.test.google.fr \ + --use_test_ca --test_case="${case}" \ + --service_config_json='{ "loadBalancingConfig": [{ "test_backend_metrics_load_balancer": {} }]}' \ + &> $CLIENT_LOG; then fail "FAIL: test case ${case} got server log: $(cat $SERVER_LOG) diff --git a/interop/test_utils.go b/interop/test_utils.go index 0057c0712..29916876e 100644 --- a/interop/test_utils.go +++ b/interop/test_utils.go @@ -30,6 +30,7 @@ import ( "io" "os" "strings" + "sync" "time" "github.com/golang/protobuf/proto" @@ -779,6 +780,7 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D type testServer struct { testgrpc.UnimplementedTestServiceServer + orcaMu sync.Mutex metricsRecorder orca.ServerMetricsRecorder } @@ -842,11 +844,6 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (* // recorder in the context, if present. setORCAMetrics(r, orcaData) } - if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil { - // Transfer the request's OOB ORCA data to the server metrics recorder - // in the server, if present. - setORCAMetrics(r, orcaData) - } return &testpb.SimpleResponse{ Payload: pl, }, nil @@ -912,6 +909,7 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe stream.SetTrailer(trailer) } } + hasORCALock := false for { in, err := stream.Recv() if err == io.EOF { @@ -929,6 +927,11 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil { // Transfer the request's OOB ORCA data to the server metrics recorder // in the server, if present. + if !hasORCALock { + s.orcaMu.Lock() + defer s.orcaMu.Unlock() + hasORCALock = true + } setORCAMetrics(r, orcaData) } @@ -1036,14 +1039,12 @@ func DoORCAOOBTest(tc testgrpc.TestServiceClient) { logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err) } - ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() want := &v3orcapb.OrcaLoadReport{ CpuUtilization: 0.8210, MemUtilization: 0.5847, Utilization: map[string]float64{"util": 0.30499}, } - checkORCAMetrics(ctx2, tc, want) + checkORCAMetrics(ctx, tc, want) err = stream.Send(&testpb.StreamingOutputCallRequest{ OrcaOobReport: &testpb.TestOrcaReport{ @@ -1061,14 +1062,12 @@ func DoORCAOOBTest(tc testgrpc.TestServiceClient) { logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err) } - ctx3, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() want = &v3orcapb.OrcaLoadReport{ CpuUtilization: 0.29309, MemUtilization: 0.2, Utilization: map[string]float64{"util": 0.2039}, } - checkORCAMetrics(ctx3, tc, want) + checkORCAMetrics(ctx, tc, want) } func checkORCAMetrics(ctx context.Context, tc testgrpc.TestServiceClient, want *v3orcapb.OrcaLoadReport) {