interop: improve rpc_soak and channel_soak test to cover concurrency in Go (#8025)

This commit is contained in:
zbilun 2025-01-23 10:42:29 -08:00 committed by GitHub
parent 8cf8fd1433
commit 897818ae26
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 268 additions and 114 deletions

View File

@ -28,6 +28,7 @@ import (
"crypto/tls"
"crypto/x509"
"flag"
"log"
"net"
"os"
"strconv"
@ -79,6 +80,7 @@ var (
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
soakNumThreads = flag.Int("soak_num_threads", 1, "The number of threads for concurrent execution of the soak tests (rpc_soak or channel_soak). The default value is set based on the interop large unary test case.")
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.")
testCase = flag.String("test_case", "large_unary",
@ -149,6 +151,21 @@ func parseAdditionalMetadataFlag() []string {
return addMd
}
// createSoakTestConfig creates a shared configuration structure for soak tests.
func createBaseSoakConfig(serverAddr string) interop.SoakTestConfig {
return interop.SoakTestConfig{
RequestSize: *soakRequestSize,
ResponseSize: *soakResponseSize,
PerIterationMaxAcceptableLatency: time.Duration(*soakPerIterationMaxAcceptableLatencyMs) * time.Millisecond,
MinTimeBetweenRPCs: time.Duration(*soakMinTimeMsBetweenRPCs) * time.Millisecond,
OverallTimeout: time.Duration(*soakOverallTimeoutSeconds) * time.Second,
ServerAddr: serverAddr,
NumWorkers: *soakNumThreads,
Iterations: *soakIterations,
MaxFailures: *soakMaxFailures,
}
}
func main() {
flag.Parse()
logger.Infof("Client running with test case %q", *testCase)
@ -261,7 +278,7 @@ func main() {
}
opts = append(opts, grpc.WithUnaryInterceptor(unaryAddMd), grpc.WithStreamInterceptor(streamingAddMd))
}
conn, err := grpc.Dial(serverAddr, opts...)
conn, err := grpc.NewClient(serverAddr, opts...)
if err != nil {
logger.Fatalf("Fail to dial: %v", err)
}
@ -358,10 +375,20 @@ func main() {
interop.DoPickFirstUnary(ctx, tc)
logger.Infoln("PickFirstUnary done")
case "rpc_soak":
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
rpcSoakConfig := createBaseSoakConfig(serverAddr)
rpcSoakConfig.ChannelForTest = func() (*grpc.ClientConn, func()) { return conn, func() {} }
interop.DoSoakTest(ctxWithDeadline, rpcSoakConfig)
logger.Infoln("RpcSoak done")
case "channel_soak":
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
channelSoakConfig := createBaseSoakConfig(serverAddr)
channelSoakConfig.ChannelForTest = func() (*grpc.ClientConn, func()) {
cc, err := grpc.NewClient(serverAddr, opts...)
if err != nil {
log.Fatalf("Failed to create shared channel: %v", err)
}
return cc, func() { cc.Close() }
}
interop.DoSoakTest(ctxWithDeadline, channelSoakConfig)
logger.Infoln("ChannelSoak done")
case "orca_per_rpc":
interop.DoORCAPerRPCTest(ctx, tc)

View File

@ -92,6 +92,8 @@ CASES=(
"unimplemented_service"
"orca_per_rpc"
"orca_oob"
"rpc_soak"
"channel_soak"
)
# Build server

202
interop/soak_tests.go Normal file
View File

@ -0,0 +1,202 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package interop
import (
"bytes"
"context"
"fmt"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/peer"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
// SoakWorkerResults stores the aggregated results for a specific worker during the soak test.
type SoakWorkerResults struct {
IterationsDone int
Failures int
Latencies *stats.Histogram
}
// SoakIterationConfig holds the parameters required for a single soak iteration.
type SoakIterationConfig struct {
RequestSize int // The size of the request payload in bytes.
ResponseSize int // The expected size of the response payload in bytes.
Client testgrpc.TestServiceClient // The gRPC client to make the call.
CallOptions []grpc.CallOption // Call options for the RPC.
}
// SoakTestConfig holds the configuration for the entire soak test.
type SoakTestConfig struct {
RequestSize int
ResponseSize int
PerIterationMaxAcceptableLatency time.Duration
MinTimeBetweenRPCs time.Duration
OverallTimeout time.Duration
ServerAddr string
NumWorkers int
Iterations int
MaxFailures int
ChannelForTest func() (*grpc.ClientConn, func())
}
func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) {
start := time.Now()
// Do a large-unary RPC.
// Create the request payload.
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(config.ResponseSize),
Payload: pl,
}
// Perform the GRPC call.
var reply *testpb.SimpleResponse
reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return 0, err
}
// Validate response.
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize)
return 0, err
}
// Calculate latency and return result.
latency = time.Since(start)
return latency, nil
}
func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) {
timeoutDuration := config.OverallTimeout
soakIterationsPerWorker := config.Iterations / config.NumWorkers
if soakWorkerResults.Latencies == nil {
soakWorkerResults.Latencies = stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
}
for i := 0; i < soakIterationsPerWorker; i++ {
if ctx.Err() != nil {
return
}
if time.Since(startTime) >= timeoutDuration {
fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout)
return
}
earliestNextStart := time.After(config.MinTimeBetweenRPCs)
currentChannel, cleanup := config.ChannelForTest()
defer cleanup()
client := testgrpc.NewTestServiceClient(currentChannel)
var p peer.Peer
iterationConfig := SoakIterationConfig{
RequestSize: config.RequestSize,
ResponseSize: config.ResponseSize,
Client: client,
CallOptions: []grpc.CallOption{grpc.Peer(&p)},
}
latency, err := doOneSoakIteration(ctx, iterationConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err)
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
if latency > config.PerIterationMaxAcceptableLatency {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency, p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds())
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
// Success: log the details of the iteration.
soakWorkerResults.Latencies.Add(latency.Milliseconds())
soakWorkerResults.IterationsDone++
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency, p.Addr, config.ServerAddr)
<-earliestNextStart
}
}
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) {
if soakConfig.Iterations%soakConfig.NumWorkers != 0 {
fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n")
}
startTime := time.Now()
var wg sync.WaitGroup
soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers)
for i := 0; i < soakConfig.NumWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID])
}(i)
}
// Wait for all goroutines to complete.
wg.Wait()
//Handle results.
totalIterations := 0
totalFailures := 0
latencies := stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
for _, worker := range soakWorkerResults {
totalIterations += worker.IterationsDone
totalFailures += worker.Failures
if worker.Latencies != nil {
// Add latencies from the worker's Histogram to the main latencies.
latencies.Merge(worker.Latencies)
}
}
var b bytes.Buffer
latencies.Print(&b)
fmt.Fprintf(os.Stderr,
"(server_uri: %s) soak test ran: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n",
soakConfig.ServerAddr, totalIterations, soakConfig.Iterations, totalFailures, b.String())
if totalIterations != soakConfig.Iterations {
fmt.Fprintf(os.Stderr, "Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations)
}
if totalFailures > soakConfig.MaxFailures {
fmt.Fprintf(os.Stderr, "Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures)
}
if soakConfig.ChannelForTest != nil {
_, cleanup := soakConfig.ChannelForTest()
defer cleanup()
}
}

View File

@ -24,7 +24,6 @@
package interop
import (
"bytes"
"context"
"fmt"
"io"
@ -36,12 +35,10 @@ import (
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
@ -684,100 +681,6 @@ func DoPickFirstUnary(ctx context.Context, tc testgrpc.TestServiceClient) {
}
}
func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, soakRequestSize int, soakResponseSize int, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) {
start := time.Now()
client := tc
if resetChannel {
var conn *grpc.ClientConn
conn, err = grpc.Dial(serverAddr, dopts...)
if err != nil {
return
}
defer conn.Close()
client = testgrpc.NewTestServiceClient(conn)
}
// per test spec, don't include channel shutdown in latency measurement
defer func() { latency = time.Since(start) }()
// do a large-unary RPC
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, soakRequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(soakResponseSize),
Payload: pl,
}
var reply *testpb.SimpleResponse
reply, err = client.UnaryCall(ctx, req, copts...)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return
}
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != soakResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, soakResponseSize)
return
}
return
}
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(ctx context.Context, tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, soakRequestSize int, soakResponseSize int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration) {
start := time.Now()
var elapsedTime float64
iterationsDone := 0
totalFailures := 0
hopts := stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
}
h := stats.NewHistogram(hopts)
for i := 0; i < soakIterations; i++ {
if ctx.Err() != nil {
elapsedTime = time.Since(start).Seconds()
break
}
earliestNextStart := time.After(minTimeBetweenRPCs)
iterationsDone++
var p peer.Peer
latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, soakRequestSize, soakResponseSize, dopts, []grpc.CallOption{grpc.Peer(&p)})
latencyMs := int64(latency / time.Millisecond)
h.Add(latencyMs)
if err != nil {
totalFailures++
addrStr := "nil"
if p.Addr != nil {
addrStr = p.Addr.String()
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", i, latencyMs, addrStr, serverAddr, err)
<-earliestNextStart
continue
}
if latency > perIterationMaxAcceptableLatency {
totalFailures++
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s exceeds max acceptable latency: %d\n", i, latencyMs, p.Addr.String(), serverAddr, perIterationMaxAcceptableLatency.Milliseconds())
<-earliestNextStart
continue
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded\n", i, latencyMs, p.Addr.String(), serverAddr)
<-earliestNextStart
}
var b bytes.Buffer
h.Print(&b)
fmt.Fprintf(os.Stderr, "(server_uri: %s) histogram of per-iteration latencies in milliseconds: %s\n", serverAddr, b.String())
fmt.Fprintf(os.Stderr, "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", serverAddr, iterationsDone, soakIterations, totalFailures, maxFailures)
if iterationsDone < soakIterations {
logger.Fatalf("(server_uri: %s) soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", serverAddr, elapsedTime, iterationsDone, soakIterations)
}
if totalFailures > maxFailures {
logger.Fatalf("(server_uri: %s) soak test total failures: %d exceeds max failures threshold: %d.", serverAddr, totalFailures, maxFailures)
}
}
type testServer struct {
testgrpc.UnimplementedTestServiceServer

View File

@ -22,6 +22,7 @@ package main
import (
"context"
"flag"
"log"
"strings"
"sync"
"time"
@ -54,6 +55,7 @@ var (
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
soakNumThreads = flag.Int("soak_num_threads", 1, "The number of threads for concurrent execution of the soak tests (rpc_soak or channel_soak). The default value is set based on the interop large unary test case.")
testCase = flag.String("test_case", "rpc_soak",
`Configure different test cases. Valid options are:
rpc_soak: sends --soak_iterations large_unary RPCs;
@ -63,6 +65,7 @@ var (
)
type clientConfig struct {
conn *grpc.ClientConn
tc testgrpc.TestServiceClient
opts []grpc.DialOption
uri string
@ -81,17 +84,6 @@ func main() {
logger.Fatalf("Unsupported credentials type: %v", c)
}
}
var resetChannel bool
switch *testCase {
case "rpc_soak":
resetChannel = false
case "channel_soak":
resetChannel = true
default:
logger.Fatal("Unsupported test case: ", *testCase)
}
// create clients as specified in flags
var clients []clientConfig
for i := range uris {
var opts []grpc.DialOption
@ -101,12 +93,13 @@ func main() {
case insecureCredsName:
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
cc, err := grpc.Dial(uris[i], opts...)
cc, err := grpc.NewClient(uris[i], opts...)
if err != nil {
logger.Fatalf("Fail to dial %v: %v", uris[i], err)
}
defer cc.Close()
clients = append(clients, clientConfig{
conn: cc,
tc: testgrpc.NewTestServiceClient(cc),
opts: opts,
uri: uris[i],
@ -116,13 +109,40 @@ func main() {
// run soak tests with the different clients
logger.Infof("Clients running with test case %q", *testCase)
var wg sync.WaitGroup
var channelForTest func() (*grpc.ClientConn, func())
ctx := context.Background()
for i := range clients {
wg.Add(1)
go func(c clientConfig) {
ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Duration(*soakOverallTimeoutSeconds)*time.Second)
defer cancel()
interop.DoSoakTest(ctxWithDeadline, c.tc, c.uri, c.opts, resetChannel, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
switch *testCase {
case "rpc_soak":
channelForTest = func() (*grpc.ClientConn, func()) { return c.conn, func() {} }
case "channel_soak":
channelForTest = func() (*grpc.ClientConn, func()) {
cc, err := grpc.NewClient(c.uri, c.opts...)
if err != nil {
log.Fatalf("Failed to create shared channel: %v", err)
}
return cc, func() { cc.Close() }
}
default:
logger.Fatal("Unsupported test case: ", *testCase)
}
soakConfig := interop.SoakTestConfig{
RequestSize: *soakRequestSize,
ResponseSize: *soakResponseSize,
PerIterationMaxAcceptableLatency: time.Duration(*soakPerIterationMaxAcceptableLatencyMs) * time.Millisecond,
MinTimeBetweenRPCs: time.Duration(*soakMinTimeMsBetweenRPCs) * time.Millisecond,
OverallTimeout: time.Duration(*soakOverallTimeoutSeconds) * time.Second,
ServerAddr: c.uri,
NumWorkers: *soakNumThreads,
Iterations: *soakIterations,
MaxFailures: *soakMaxFailures,
ChannelForTest: channelForTest,
}
interop.DoSoakTest(ctxWithDeadline, soakConfig)
logger.Infof("%s test done for server: %s", *testCase, c.uri)
wg.Done()
}(clients[i])

View File

@ -49,7 +49,7 @@ git grep 'func [A-Z]' -- "*_test.go" | not grep -v 'func Test\|Benchmark\|Exampl
# - Do not use time.After except in tests. It has the potential to leak the
# timer since there is no way to stop it early.
git grep -l 'time.After(' -- "*.go" | not grep -v '_test.go\|test_utils\|testutils'
git grep -l 'time.After(' -- "*.go" | not grep -v '_test.go\|soak_tests\|testutils'
# - Do not use "interface{}"; use "any" instead.
git grep -l 'interface{}' -- "*.go" 2>&1 | not grep -v '\.pb\.go\|protoc-gen-go-grpc\|grpc_testing_not_regenerated'