interop: hold lock on server for OOB metrics updates; share 30s timeout (#6277)

This commit is contained in:
Doug Fawley 2023-05-12 14:09:59 -07:00 committed by GitHub
parent 68381e7bd2
commit 5dcfb37c0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 12 deletions

View File

@ -45,6 +45,20 @@ pass () {
echo "$(tput setaf 2) $(date): $1 $(tput sgr 0)"
}
withTimeout () {
timer=$1
shift
# Run command in the background.
cmd=$(printf '%q ' "$@")
eval "$cmd" &
wpid=$!
# Kill after 20 seconds.
sleep $timer && kill $wpid &
# Wait for the background thread.
wait $wpid
}
# Don't run some tests that need a special environment:
# "google_default_credentials"
# "compute_engine_channel_credentials"
@ -70,6 +84,8 @@ CASES=(
"custom_metadata"
"unimplemented_method"
"unimplemented_service"
"orca_per_rpc"
"orca_oob"
)
# Build server
@ -96,7 +112,12 @@ for case in ${CASES[@]}; do
echo "$(tput setaf 4) $(date): testing: ${case} $(tput sgr 0)"
CLIENT_LOG="$(mktemp)"
if ! GRPC_GO_LOG_SEVERITY_LEVEL=info timeout 20 go run ./interop/client --use_tls --server_host_override=foo.test.google.fr --use_test_ca --test_case="${case}" &> $CLIENT_LOG; then
if ! GRPC_GO_LOG_SEVERITY_LEVEL=info withTimeout 20 go run ./interop/client \
--use_tls \
--server_host_override=foo.test.google.fr \
--use_test_ca --test_case="${case}" \
--service_config_json='{ "loadBalancingConfig": [{ "test_backend_metrics_load_balancer": {} }]}' \
&> $CLIENT_LOG; then
fail "FAIL: test case ${case}
got server log:
$(cat $SERVER_LOG)

View File

@ -30,6 +30,7 @@ import (
"io"
"os"
"strings"
"sync"
"time"
"github.com/golang/protobuf/proto"
@ -779,6 +780,7 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D
type testServer struct {
testgrpc.UnimplementedTestServiceServer
orcaMu sync.Mutex
metricsRecorder orca.ServerMetricsRecorder
}
@ -842,11 +844,6 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*
// recorder in the context, if present.
setORCAMetrics(r, orcaData)
}
if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil {
// Transfer the request's OOB ORCA data to the server metrics recorder
// in the server, if present.
setORCAMetrics(r, orcaData)
}
return &testpb.SimpleResponse{
Payload: pl,
}, nil
@ -912,6 +909,7 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe
stream.SetTrailer(trailer)
}
}
hasORCALock := false
for {
in, err := stream.Recv()
if err == io.EOF {
@ -929,6 +927,11 @@ func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallSe
if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil {
// Transfer the request's OOB ORCA data to the server metrics recorder
// in the server, if present.
if !hasORCALock {
s.orcaMu.Lock()
defer s.orcaMu.Unlock()
hasORCALock = true
}
setORCAMetrics(r, orcaData)
}
@ -1036,14 +1039,12 @@ func DoORCAOOBTest(tc testgrpc.TestServiceClient) {
logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
}
ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
want := &v3orcapb.OrcaLoadReport{
CpuUtilization: 0.8210,
MemUtilization: 0.5847,
Utilization: map[string]float64{"util": 0.30499},
}
checkORCAMetrics(ctx2, tc, want)
checkORCAMetrics(ctx, tc, want)
err = stream.Send(&testpb.StreamingOutputCallRequest{
OrcaOobReport: &testpb.TestOrcaReport{
@ -1061,14 +1062,12 @@ func DoORCAOOBTest(tc testgrpc.TestServiceClient) {
logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
}
ctx3, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
want = &v3orcapb.OrcaLoadReport{
CpuUtilization: 0.29309,
MemUtilization: 0.2,
Utilization: map[string]float64{"util": 0.2039},
}
checkORCAMetrics(ctx3, tc, want)
checkORCAMetrics(ctx, tc, want)
}
func checkORCAMetrics(ctx context.Context, tc testgrpc.TestServiceClient, want *v3orcapb.OrcaLoadReport) {