interop: implement rpc-behavior for UnaryCall() (#6575)

This commit is contained in:
Aditya Sood 2023-09-27 23:13:03 +05:30 committed by GitHub
parent c6264a9f90
commit fd9ef7263a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 2 deletions

View File

@ -66,7 +66,7 @@ func (s) TestCustomLB(t *testing.T) {
}
rpcBMD := md.Get("rpc-behavior")
if len(rpcBMD) != 1 {
errCh.Send(fmt.Errorf("received %d values for metadata key rpc-behavior, want 1", len(rpcBMD)))
errCh.Send(fmt.Errorf("received %d values for metadata key \"rpc-behavior\", want 1", len(rpcBMD)))
return &testpb.SimpleResponse{}, nil
}
wantVal := "error-code-0"

View File

@ -26,12 +26,17 @@ import (
"log"
"net"
"os"
"strconv"
"strings"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/admin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health"
"google.golang.org/grpc/internal/status"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/xds"
@ -53,6 +58,16 @@ var (
logger = grpclog.Component("interop")
)
const (
rpcBehaviorMDKey = "rpc-behavior"
grpcPreviousRPCAttemptsMDKey = "grpc-previous-rpc-attempts"
sleepPfx = "sleep-"
keepOpenVal = "keep-open"
errorCodePfx = "error-code-"
succeedOnRetryPfx = "succeed-on-retry-attempt-"
hostnamePfx = "hostname="
)
func getHostname() string {
if *hostNameOverride != "" {
return *hostNameOverride
@ -78,8 +93,101 @@ func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*test
}
func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
response := &testpb.SimpleResponse{ServerId: s.serverID, Hostname: s.hostname}
forLoop:
for _, headerVal := range getRPCBehaviorMetadata(ctx) {
// A value can have a prefix "hostname=<string>" followed by a space.
// In that case, the rest of the value should only be applied
// if the specified hostname matches the server's hostname.
if strings.HasPrefix(headerVal, hostnamePfx) {
splitVal := strings.Split(headerVal, " ")
if len(splitVal) <= 1 {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for rpc-behavior header %v, must be 'hostname=<string> <header>=<value>' instead", headerVal)
}
if s.hostname != splitVal[0][len(hostnamePfx):] {
continue forLoop
}
headerVal = splitVal[1]
}
switch {
// If the value matches "sleep-<int>", the server should wait
// the specified number of seconds before resuming
// behavior matching and RPC processing.
case strings.HasPrefix(headerVal, sleepPfx):
sleep, err := strconv.Atoi(headerVal[len(sleepPfx):])
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for rpc-behavior header %v, must be 'sleep-<int>' instead", headerVal)
}
time.Sleep(time.Duration(sleep) * time.Second)
// If the value matches "keep-open", the server should
// never respond to the request and behavior matching ends.
case strings.HasPrefix(headerVal, keepOpenVal):
<-ctx.Done()
return nil, nil
// If the value matches "error-code-<int>", the server should
// respond with the specified status code and behavior matching ends.
case strings.HasPrefix(headerVal, errorCodePfx):
code, err := strconv.Atoi(headerVal[len(errorCodePfx):])
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for rpc-behavior header %v, must be 'error-code-<int>' instead", headerVal)
}
return nil, status.Errorf(codes.Code(code), "rpc failed as per the rpc-behavior header value: %v", headerVal)
// If the value matches "success-on-retry-attempt-<int>", and the
// value of the "grpc-previous-rpc-attempts" metadata field is equal to
// the specified number, the normal RPC processing should resume
// and behavior matching ends.
case strings.HasPrefix(headerVal, succeedOnRetryPfx):
wantRetry, err := strconv.Atoi(headerVal[len(succeedOnRetryPfx):])
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for rpc-behavior header %v, must be 'success-on-retry-attempt-<int>' instead", headerVal)
}
mdRetry := getMetadataValues(ctx, grpcPreviousRPCAttemptsMDKey)
curRetry, err := strconv.Atoi(mdRetry[0])
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid format for grpc-previous-rpc-attempts header: %v", mdRetry[0])
}
if curRetry == wantRetry {
break forLoop
}
}
}
grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
return &testpb.SimpleResponse{ServerId: s.serverID, Hostname: s.hostname}, nil
return response, status.Err(codes.OK, "")
}
func getRPCBehaviorMetadata(ctx context.Context) []string {
mdRPCBehavior := getMetadataValues(ctx, rpcBehaviorMDKey)
var rpcBehaviorMetadata []string
for _, mdVal := range mdRPCBehavior {
splitVals := strings.Split(mdVal, ",")
for _, val := range splitVals {
headerVal := strings.TrimSpace(val)
if headerVal == "" {
continue
}
rpcBehaviorMetadata = append(rpcBehaviorMetadata, headerVal)
}
}
return rpcBehaviorMetadata
}
func getMetadataValues(ctx context.Context, metadataKey string) []string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
logger.Error("Failed to retrieve metadata from incoming RPC context")
return nil
}
return md.Get(metadataKey)
}
// xdsUpdateHealthServiceImpl provides an implementation of the