diff --git a/interop/client/client.go b/interop/client/client.go index 8238e0a10..845ad79de 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -77,6 +77,8 @@ var ( soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.") soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.") soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS") + soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.") + soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.") tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.") additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.") testCase = flag.String("test_case", "large_unary", @@ -352,10 +354,10 @@ func main() { interop.DoPickFirstUnary(tc) logger.Infoln("PickFirstUnary done") case "rpc_soak": - interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) + interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) logger.Infoln("RpcSoak done") case "channel_soak": - interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) + interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) logger.Infoln("ChannelSoak done") case "orca_per_rpc": interop.DoORCAPerRPCTest(tc) diff --git a/interop/test_utils.go b/interop/test_utils.go index 29916876e..7b597cae4 100644 --- a/interop/test_utils.go +++ b/interop/test_utils.go @@ -684,7 +684,7 @@ func DoPickFirstUnary(tc testgrpc.TestServiceClient) { } } -func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) { +func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, soakRequestSize int, soakResponseSize int, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) { start := time.Now() client := tc if resetChannel { @@ -699,10 +699,10 @@ func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, rese // per test spec, don't include channel shutdown in latency measurement defer func() { latency = time.Since(start) }() // do a large-unary RPC - pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize) + pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, soakRequestSize) req := &testpb.SimpleRequest{ ResponseType: testpb.PayloadType_COMPRESSABLE, - ResponseSize: int32(largeRespSize), + ResponseSize: int32(soakResponseSize), Payload: pl, } var reply *testpb.SimpleResponse @@ -713,8 +713,8 @@ func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, rese } t := reply.GetPayload().GetType() s := len(reply.GetPayload().GetBody()) - if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize { - err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize) + if t != testpb.PayloadType_COMPRESSABLE || s != soakResponseSize { + err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, soakResponseSize) return } return @@ -723,7 +723,8 @@ func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, rese // DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds. // If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new // stub that is created with the provided server address and dial options. -func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration, overallDeadline time.Time) { +// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method. +func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, soakRequestSize int, soakResponseSize int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration, overallDeadline time.Time) { start := time.Now() ctx, cancel := context.WithDeadline(context.Background(), overallDeadline) defer cancel() @@ -743,7 +744,7 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D earliestNextStart := time.After(minTimeBetweenRPCs) iterationsDone++ var p peer.Peer - latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, dopts, []grpc.CallOption{grpc.Peer(&p)}) + latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, soakRequestSize, soakResponseSize, dopts, []grpc.CallOption{grpc.Peer(&p)}) latencyMs := int64(latency / time.Millisecond) h.Add(latencyMs) if err != nil { diff --git a/interop/xds_federation/client.go b/interop/xds_federation/client.go index 31ec9bba7..eee5ba747 100644 --- a/interop/xds_federation/client.go +++ b/interop/xds_federation/client.go @@ -51,6 +51,8 @@ var ( soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.") soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.") soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS") + soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.") + soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.") testCase = flag.String("test_case", "rpc_soak", `Configure different test cases. Valid options are: rpc_soak: sends --soak_iterations large_unary RPCs; @@ -116,7 +118,7 @@ func main() { for i := range clients { wg.Add(1) go func(c clientConfig) { - interop.DoSoakTest(c.tc, c.uri, c.opts, resetChannel, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) + interop.DoSoakTest(c.tc, c.uri, c.opts, resetChannel, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) logger.Infof("%s test done for server: %s", *testCase, c.uri) wg.Done() }(clients[i]) diff --git a/interop/xds_federation/xds_federation b/interop/xds_federation/xds_federation new file mode 100755 index 000000000..5901b7c48 Binary files /dev/null and b/interop/xds_federation/xds_federation differ