mirror of https://github.com/grpc/grpc-go.git
interop-testing: add soak test cases to interop client (#4677)
This commit is contained in:
parent
a6a63177ae
commit
1fe5adbbf8
|
@ -26,6 +26,7 @@ import (
|
|||
"io/ioutil"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
_ "google.golang.org/grpc/balancer/grpclb"
|
||||
|
@ -48,20 +49,24 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
caFile = flag.String("ca_file", "", "The file containning the CA root cert file")
|
||||
useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true")
|
||||
useALTS = flag.Bool("use_alts", false, "Connection uses ALTS if true (this option can only be used on GCP)")
|
||||
customCredentialsType = flag.String("custom_credentials_type", "", "Custom creds to use, excluding TLS or ALTS")
|
||||
altsHSAddr = flag.String("alts_handshaker_service_address", "", "ALTS handshaker gRPC service address")
|
||||
testCA = flag.Bool("use_test_ca", false, "Whether to replace platform root CAs with test CA as the CA root")
|
||||
serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file")
|
||||
oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens")
|
||||
defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account")
|
||||
serverHost = flag.String("server_host", "localhost", "The server host name")
|
||||
serverPort = flag.Int("server_port", 10000, "The server port number")
|
||||
serviceConfigJSON = flag.String("service_config_json", "", "Disables service config lookups and sets the provided string as the default service config.")
|
||||
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
|
||||
testCase = flag.String("test_case", "large_unary",
|
||||
caFile = flag.String("ca_file", "", "The file containning the CA root cert file")
|
||||
useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true")
|
||||
useALTS = flag.Bool("use_alts", false, "Connection uses ALTS if true (this option can only be used on GCP)")
|
||||
customCredentialsType = flag.String("custom_credentials_type", "", "Custom creds to use, excluding TLS or ALTS")
|
||||
altsHSAddr = flag.String("alts_handshaker_service_address", "", "ALTS handshaker gRPC service address")
|
||||
testCA = flag.Bool("use_test_ca", false, "Whether to replace platform root CAs with test CA as the CA root")
|
||||
serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file")
|
||||
oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens")
|
||||
defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account")
|
||||
serverHost = flag.String("server_host", "localhost", "The server host name")
|
||||
serverPort = flag.Int("server_port", 10000, "The server port number")
|
||||
serviceConfigJSON = flag.String("service_config_json", "", "Disables service config lookups and sets the provided string as the default service config.")
|
||||
soakIterations = flag.Int("soak_iterations", 10, "The number of iterations to use for the two soak tests: rpc_soak and channel_soak")
|
||||
soakMaxFailures = flag.Int("soak_max_failures", 0, "The number of iterations in soak tests that are allowed to fail (either due to non-OK status code or exceeding the per-iteration max acceptable latency).")
|
||||
soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.")
|
||||
soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.")
|
||||
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
|
||||
testCase = flag.String("test_case", "large_unary",
|
||||
`Configure different test cases. Valid options are:
|
||||
empty_unary : empty (zero bytes) request and response;
|
||||
large_unary : single request and (large) response;
|
||||
|
@ -292,6 +297,12 @@ func main() {
|
|||
case "pick_first_unary":
|
||||
interop.DoPickFirstUnary(tc)
|
||||
logger.Infoln("PickFirstUnary done")
|
||||
case "rpc_soak":
|
||||
interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
|
||||
logger.Infoln("RpcSoak done")
|
||||
case "channel_soak":
|
||||
interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
|
||||
logger.Infoln("ChannelSoak done")
|
||||
default:
|
||||
logger.Fatal("Unsupported test case: ", *testCase)
|
||||
}
|
||||
|
|
|
@ -20,10 +20,12 @@
|
|||
package interop
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -31,6 +33,7 @@ import (
|
|||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/google"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/benchmark/stats"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
@ -673,6 +676,91 @@ func DoPickFirstUnary(tc testgrpc.TestServiceClient) {
|
|||
}
|
||||
}
|
||||
|
||||
func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, dopts []grpc.DialOption) (latency time.Duration, err error) {
|
||||
start := time.Now()
|
||||
client := tc
|
||||
if resetChannel {
|
||||
var conn *grpc.ClientConn
|
||||
conn, err = grpc.Dial(serverAddr, dopts...)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
client = testgrpc.NewTestServiceClient(conn)
|
||||
}
|
||||
// per test spec, don't include channel shutdown in latency measurement
|
||||
defer func() { latency = time.Since(start) }()
|
||||
// do a large-unary RPC
|
||||
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
|
||||
req := &testpb.SimpleRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||
ResponseSize: int32(largeRespSize),
|
||||
Payload: pl,
|
||||
}
|
||||
var reply *testpb.SimpleResponse
|
||||
reply, err = client.UnaryCall(ctx, req)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
|
||||
return
|
||||
}
|
||||
t := reply.GetPayload().GetType()
|
||||
s := len(reply.GetPayload().GetBody())
|
||||
if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
|
||||
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
|
||||
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
|
||||
// stub that is created with the provided server address and dial options.
|
||||
func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, overallDeadline time.Time) {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithDeadline(context.Background(), overallDeadline)
|
||||
defer cancel()
|
||||
iterationsDone := 0
|
||||
totalFailures := 0
|
||||
hopts := stats.HistogramOptions{
|
||||
NumBuckets: 20,
|
||||
GrowthFactor: 1,
|
||||
BaseBucketSize: 1,
|
||||
MinValue: 0,
|
||||
}
|
||||
h := stats.NewHistogram(hopts)
|
||||
for i := 0; i < soakIterations; i++ {
|
||||
if time.Now().After(overallDeadline) {
|
||||
break
|
||||
}
|
||||
iterationsDone++
|
||||
latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, dopts)
|
||||
latencyMs := int64(latency / time.Millisecond)
|
||||
h.Add(latencyMs)
|
||||
if err != nil {
|
||||
totalFailures++
|
||||
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d failed: %s\n", i, latencyMs, err)
|
||||
continue
|
||||
}
|
||||
if latency > perIterationMaxAcceptableLatency {
|
||||
totalFailures++
|
||||
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d exceeds max acceptable latency: %d\n", i, latencyMs, perIterationMaxAcceptableLatency.Milliseconds())
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d succeeded\n", i, latencyMs)
|
||||
}
|
||||
var b bytes.Buffer
|
||||
h.Print(&b)
|
||||
fmt.Fprintln(os.Stderr, "Histogram of per-iteration latencies in milliseconds:")
|
||||
fmt.Fprintln(os.Stderr, b.String())
|
||||
fmt.Fprintf(os.Stderr, "soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", iterationsDone, soakIterations, totalFailures, maxFailures)
|
||||
if iterationsDone < soakIterations {
|
||||
logger.Fatalf("soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", overallDeadline.Sub(start).Seconds(), iterationsDone, soakIterations)
|
||||
}
|
||||
if totalFailures > maxFailures {
|
||||
logger.Fatalf("soak test total failures: %d exceeds max failures threshold: %d.", totalFailures, maxFailures)
|
||||
}
|
||||
}
|
||||
|
||||
type testServer struct {
|
||||
testgrpc.UnimplementedTestServiceServer
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue