interop: let the interop client send additional metadata, controlled by a flag (#6295)

This commit is contained in:
apolcyn 2023-05-22 15:32:29 -07:00 committed by GitHub
parent 9b7a947cdc
commit 511a96359f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 43 additions and 0 deletions

View File

@ -24,12 +24,14 @@
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"net"
"os"
"strconv"
"strings"
"time"
"golang.org/x/oauth2"
@ -41,6 +43,7 @@ import (
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/interop"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/testdata"
@ -75,6 +78,7 @@ var (
soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.")
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.")
testCase = flag.String("test_case", "large_unary",
`Configure different test cases. Valid options are:
empty_unary : empty (zero bytes) request and response;
@ -115,6 +119,34 @@ const (
credsComputeEngineCreds
)
// Parses the --additional_metadata flag and returns metadata to send on each RPC,
// formatted as per https://pkg.go.dev/google.golang.org/grpc/metadata#Pairs.
// Allow any character but semicolons in values. If the flag is empty, return a nil map.
func parseAdditionalMetadataFlag() []string {
if len(*additionalMetadata) == 0 {
return nil
}
r := *additionalMetadata
addMd := make([]string, 0)
for len(r) > 0 {
i := strings.Index(r, ":")
if i < 0 {
logger.Fatalf("Error parsing --additional_metadata flag: missing colon separator")
}
addMd = append(addMd, r[:i]) // append key
r = r[i+1:]
i = strings.Index(r, ";")
// append value
if i < 0 {
addMd = append(addMd, r)
break
}
addMd = append(addMd, r[:i])
r = r[i+1:]
}
return addMd
}
func main() {
flag.Parse()
logger.Infof("Client running with test case %q", *testCase)
@ -214,6 +246,17 @@ func main() {
if len(*serviceConfigJSON) > 0 {
opts = append(opts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(*serviceConfigJSON))
}
if addMd := parseAdditionalMetadataFlag(); addMd != nil {
unaryAddMd := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = metadata.AppendToOutgoingContext(ctx, addMd...)
return invoker(ctx, method, req, reply, cc, opts...)
}
streamingAddMd := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = metadata.AppendToOutgoingContext(ctx, addMd...)
return streamer(ctx, desc, cc, method, opts...)
}
opts = append(opts, grpc.WithUnaryInterceptor(unaryAddMd), grpc.WithStreamInterceptor(streamingAddMd))
}
conn, err := grpc.Dial(serverAddr, opts...)
if err != nil {
logger.Fatalf("Fail to dial: %v", err)