mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			728 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			728 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2018 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| package test
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net"
 | |
| 	"reflect"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/codes"
 | |
| 	"google.golang.org/grpc/credentials/insecure"
 | |
| 	"google.golang.org/grpc/internal/grpcsync"
 | |
| 	"google.golang.org/grpc/internal/stubserver"
 | |
| 	"google.golang.org/grpc/metadata"
 | |
| 	"google.golang.org/grpc/stats"
 | |
| 	"google.golang.org/grpc/status"
 | |
| 	"google.golang.org/protobuf/proto"
 | |
| 
 | |
| 	testgrpc "google.golang.org/grpc/interop/grpc_testing"
 | |
| 	testpb "google.golang.org/grpc/interop/grpc_testing"
 | |
| )
 | |
| 
 | |
| func (s) TestRetryUnary(t *testing.T) {
 | |
| 	i := -1
 | |
| 	ss := &stubserver.StubServer{
 | |
| 		EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) {
 | |
| 			defer func() { t.Logf("server call %v returning err %v", i, err) }()
 | |
| 			i++
 | |
| 			switch i {
 | |
| 			case 0, 2, 5:
 | |
| 				return &testpb.Empty{}, nil
 | |
| 			case 6, 8, 11:
 | |
| 				return nil, status.New(codes.Internal, "non-retryable error").Err()
 | |
| 			}
 | |
| 			return nil, status.New(codes.AlreadyExists, "retryable error").Err()
 | |
| 		},
 | |
| 	}
 | |
| 	if err := ss.Start([]grpc.ServerOption{},
 | |
| 		grpc.WithDefaultServiceConfig(`{
 | |
|     "methodConfig": [{
 | |
|       "name": [{"service": "grpc.testing.TestService"}],
 | |
|       "waitForReady": true,
 | |
|       "retryPolicy": {
 | |
|         "MaxAttempts": 4,
 | |
|         "InitialBackoff": ".01s",
 | |
|         "MaxBackoff": ".01s",
 | |
|         "BackoffMultiplier": 1.0,
 | |
|         "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
 | |
|       }
 | |
|     }]}`)); err != nil {
 | |
| 		t.Fatalf("Error starting endpoint server: %v", err)
 | |
| 	}
 | |
| 	defer ss.Stop()
 | |
| 
 | |
| 	testCases := []struct {
 | |
| 		code  codes.Code
 | |
| 		count int
 | |
| 	}{
 | |
| 		{codes.OK, 0},
 | |
| 		{codes.OK, 2},
 | |
| 		{codes.OK, 5},
 | |
| 		{codes.Internal, 6},
 | |
| 		{codes.Internal, 8},
 | |
| 		{codes.Internal, 11},
 | |
| 		{codes.AlreadyExists, 15},
 | |
| 	}
 | |
| 	for num, tc := range testCases {
 | |
| 		t.Log("Case", num)
 | |
| 		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 		_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
 | |
| 		cancel()
 | |
| 		if status.Code(err) != tc.code {
 | |
| 			t.Fatalf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
 | |
| 		}
 | |
| 		if i != tc.count {
 | |
| 			t.Fatalf("i = %v; want %v", i, tc.count)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s) TestRetryThrottling(t *testing.T) {
 | |
| 	i := -1
 | |
| 	ss := &stubserver.StubServer{
 | |
| 		EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
 | |
| 			i++
 | |
| 			switch i {
 | |
| 			case 0, 3, 6, 10, 11, 12, 13, 14, 16, 18:
 | |
| 				return &testpb.Empty{}, nil
 | |
| 			}
 | |
| 			return nil, status.New(codes.Unavailable, "retryable error").Err()
 | |
| 		},
 | |
| 	}
 | |
| 	if err := ss.Start([]grpc.ServerOption{},
 | |
| 		grpc.WithDefaultServiceConfig(`{
 | |
|     "methodConfig": [{
 | |
|       "name": [{"service": "grpc.testing.TestService"}],
 | |
|       "waitForReady": true,
 | |
|       "retryPolicy": {
 | |
|         "MaxAttempts": 4,
 | |
|         "InitialBackoff": ".01s",
 | |
|         "MaxBackoff": ".01s",
 | |
|         "BackoffMultiplier": 1.0,
 | |
|         "RetryableStatusCodes": [ "UNAVAILABLE" ]
 | |
|       }
 | |
|     }],
 | |
|     "retryThrottling": {
 | |
|       "maxTokens": 10,
 | |
|       "tokenRatio": 0.5
 | |
|     }
 | |
|     }`)); err != nil {
 | |
| 		t.Fatalf("Error starting endpoint server: %v", err)
 | |
| 	}
 | |
| 	defer ss.Stop()
 | |
| 
 | |
| 	testCases := []struct {
 | |
| 		code  codes.Code
 | |
| 		count int
 | |
| 	}{
 | |
| 		{codes.OK, 0},           // tokens = 10
 | |
| 		{codes.OK, 3},           // tokens = 8.5 (10 - 2 failures + 0.5 success)
 | |
| 		{codes.OK, 6},           // tokens = 6
 | |
| 		{codes.Unavailable, 8},  // tokens = 5 -- first attempt is retried; second aborted.
 | |
| 		{codes.Unavailable, 9},  // tokens = 4
 | |
| 		{codes.OK, 10},          // tokens = 4.5
 | |
| 		{codes.OK, 11},          // tokens = 5
 | |
| 		{codes.OK, 12},          // tokens = 5.5
 | |
| 		{codes.OK, 13},          // tokens = 6
 | |
| 		{codes.OK, 14},          // tokens = 6.5
 | |
| 		{codes.OK, 16},          // tokens = 5.5
 | |
| 		{codes.Unavailable, 17}, // tokens = 4.5
 | |
| 	}
 | |
| 	for _, tc := range testCases {
 | |
| 		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 		_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
 | |
| 		cancel()
 | |
| 		if status.Code(err) != tc.code {
 | |
| 			t.Errorf("EmptyCall(_, _) = _, %v; want _, <Code() = %v>", err, tc.code)
 | |
| 		}
 | |
| 		if i != tc.count {
 | |
| 			t.Errorf("i = %v; want %v", i, tc.count)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s) TestRetryStreaming(t *testing.T) {
 | |
| 	req := func(b byte) *testpb.StreamingOutputCallRequest {
 | |
| 		return &testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{Body: []byte{b}}}
 | |
| 	}
 | |
| 	res := func(b byte) *testpb.StreamingOutputCallResponse {
 | |
| 		return &testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{Body: []byte{b}}}
 | |
| 	}
 | |
| 
 | |
| 	largePayload, _ := newPayload(testpb.PayloadType_COMPRESSABLE, 500)
 | |
| 
 | |
| 	type serverOp func(stream testgrpc.TestService_FullDuplexCallServer) error
 | |
| 	type clientOp func(stream testgrpc.TestService_FullDuplexCallClient) error
 | |
| 
 | |
| 	// Server Operations
 | |
| 	sAttempts := func(n int) serverOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			const key = "grpc-previous-rpc-attempts"
 | |
| 			md, ok := metadata.FromIncomingContext(stream.Context())
 | |
| 			if !ok {
 | |
| 				return status.Errorf(codes.Internal, "server: no header metadata received")
 | |
| 			}
 | |
| 			if got := md[key]; len(got) != 1 || got[0] != strconv.Itoa(n) {
 | |
| 				return status.Errorf(codes.Internal, "server: metadata = %v; want <contains %q: %q>", md, key, n)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	sReq := func(b byte) serverOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			want := req(b)
 | |
| 			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
 | |
| 				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, <nil>", got, err, want)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	sReqPayload := func(p *testpb.Payload) serverOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			want := &testpb.StreamingOutputCallRequest{Payload: p}
 | |
| 			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
 | |
| 				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, <nil>", got, err, want)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	sHdr := func() serverOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			return stream.SendHeader(metadata.Pairs("test_header", "test_value"))
 | |
| 		}
 | |
| 	}
 | |
| 	sRes := func(b byte) serverOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			msg := res(b)
 | |
| 			if err := stream.Send(msg); err != nil {
 | |
| 				return status.Errorf(codes.Internal, "server: Send(%v) = %v; want <nil>", msg, err)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	sErr := func(c codes.Code) serverOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			return status.New(c, "this is a test error").Err()
 | |
| 		}
 | |
| 	}
 | |
| 	sCloseSend := func() serverOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			if msg, err := stream.Recv(); msg != nil || err != io.EOF {
 | |
| 				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want <nil>, io.EOF", msg, err)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	sPushback := func(s string) serverOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			stream.SetTrailer(metadata.MD{"grpc-retry-pushback-ms": []string{s}})
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Client Operations
 | |
| 	cReq := func(b byte) clientOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			msg := req(b)
 | |
| 			if err := stream.Send(msg); err != nil {
 | |
| 				return fmt.Errorf("client: Send(%v) = %v; want <nil>", msg, err)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	cReqPayload := func(p *testpb.Payload) clientOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			msg := &testpb.StreamingOutputCallRequest{Payload: p}
 | |
| 			if err := stream.Send(msg); err != nil {
 | |
| 				return fmt.Errorf("client: Send(%v) = %v; want <nil>", msg, err)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	cRes := func(b byte) clientOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			want := res(b)
 | |
| 			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
 | |
| 				return fmt.Errorf("client: Recv() = %v, %v; want %v, <nil>", got, err, want)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	cErr := func(c codes.Code) clientOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			want := status.New(c, "this is a test error").Err()
 | |
| 			if c == codes.OK {
 | |
| 				want = io.EOF
 | |
| 			}
 | |
| 			res, err := stream.Recv()
 | |
| 			if res != nil ||
 | |
| 				((err == nil) != (want == nil)) ||
 | |
| 				(want != nil && err.Error() != want.Error()) {
 | |
| 				return fmt.Errorf("client: Recv() = %v, %v; want <nil>, %v", res, err, want)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	cCloseSend := func() clientOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			if err := stream.CloseSend(); err != nil {
 | |
| 				return fmt.Errorf("client: CloseSend() = %v; want <nil>", err)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	var curTime time.Time
 | |
| 	cGetTime := func() clientOp {
 | |
| 		return func(_ testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			curTime = time.Now()
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	cCheckElapsed := func(d time.Duration) clientOp {
 | |
| 		return func(_ testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			if elapsed := time.Since(curTime); elapsed < d {
 | |
| 				return fmt.Errorf("elapsed time: %v; want >= %v", elapsed, d)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	cHdr := func() clientOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			_, err := stream.Header()
 | |
| 			if err == io.EOF {
 | |
| 				// The stream ended successfully; convert to nil to avoid
 | |
| 				// erroring the test case.
 | |
| 				err = nil
 | |
| 			}
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	cCtx := func() clientOp {
 | |
| 		return func(stream testgrpc.TestService_FullDuplexCallClient) error {
 | |
| 			stream.Context()
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	testCases := []struct {
 | |
| 		desc      string
 | |
| 		serverOps []serverOp
 | |
| 		clientOps []clientOp
 | |
| 	}{{
 | |
| 		desc:      "Non-retryable error code",
 | |
| 		serverOps: []serverOp{sReq(1), sErr(codes.Internal)},
 | |
| 		clientOps: []clientOp{cReq(1), cErr(codes.Internal)},
 | |
| 	}, {
 | |
| 		desc:      "One retry necessary",
 | |
| 		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1), sRes(1)},
 | |
| 		clientOps: []clientOp{cReq(1), cRes(1), cErr(codes.OK)},
 | |
| 	}, {
 | |
| 		desc: "Exceed max attempts (4); check attempts header on server",
 | |
| 		serverOps: []serverOp{
 | |
| 			sReq(1), sErr(codes.Unavailable),
 | |
| 			sReq(1), sAttempts(1), sErr(codes.Unavailable),
 | |
| 			sAttempts(2), sReq(1), sErr(codes.Unavailable),
 | |
| 			sAttempts(3), sReq(1), sErr(codes.Unavailable),
 | |
| 		},
 | |
| 		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
 | |
| 	}, {
 | |
| 		desc: "Multiple requests",
 | |
| 		serverOps: []serverOp{
 | |
| 			sReq(1), sReq(2), sErr(codes.Unavailable),
 | |
| 			sReq(1), sReq(2), sRes(5),
 | |
| 		},
 | |
| 		clientOps: []clientOp{cReq(1), cReq(2), cRes(5), cErr(codes.OK)},
 | |
| 	}, {
 | |
| 		desc: "Multiple successive requests",
 | |
| 		serverOps: []serverOp{
 | |
| 			sReq(1), sErr(codes.Unavailable),
 | |
| 			sReq(1), sReq(2), sErr(codes.Unavailable),
 | |
| 			sReq(1), sReq(2), sReq(3), sRes(5),
 | |
| 		},
 | |
| 		clientOps: []clientOp{cReq(1), cReq(2), cReq(3), cRes(5), cErr(codes.OK)},
 | |
| 	}, {
 | |
| 		desc: "No retry after receiving",
 | |
| 		serverOps: []serverOp{
 | |
| 			sReq(1), sErr(codes.Unavailable),
 | |
| 			sReq(1), sRes(3), sErr(codes.Unavailable),
 | |
| 		},
 | |
| 		clientOps: []clientOp{cReq(1), cRes(3), cErr(codes.Unavailable)},
 | |
| 	}, {
 | |
| 		desc:      "Retry via ClientStream.Header()",
 | |
| 		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1)},
 | |
| 		clientOps: []clientOp{cReq(1), cHdr() /* this should cause a retry */, cErr(codes.OK)},
 | |
| 	}, {
 | |
| 		desc:      "No retry after header",
 | |
| 		serverOps: []serverOp{sReq(1), sHdr(), sErr(codes.Unavailable)},
 | |
| 		clientOps: []clientOp{cReq(1), cHdr(), cErr(codes.Unavailable)},
 | |
| 	}, {
 | |
| 		desc:      "No retry after context",
 | |
| 		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable)},
 | |
| 		clientOps: []clientOp{cReq(1), cCtx(), cErr(codes.Unavailable)},
 | |
| 	}, {
 | |
| 		desc: "Replaying close send",
 | |
| 		serverOps: []serverOp{
 | |
| 			sReq(1), sReq(2), sCloseSend(), sErr(codes.Unavailable),
 | |
| 			sReq(1), sReq(2), sCloseSend(), sRes(1), sRes(3), sRes(5),
 | |
| 		},
 | |
| 		clientOps: []clientOp{cReq(1), cReq(2), cCloseSend(), cRes(1), cRes(3), cRes(5), cErr(codes.OK)},
 | |
| 	}, {
 | |
| 		desc:      "Negative server pushback - no retry",
 | |
| 		serverOps: []serverOp{sReq(1), sPushback("-1"), sErr(codes.Unavailable)},
 | |
| 		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
 | |
| 	}, {
 | |
| 		desc:      "Non-numeric server pushback - no retry",
 | |
| 		serverOps: []serverOp{sReq(1), sPushback("xxx"), sErr(codes.Unavailable)},
 | |
| 		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
 | |
| 	}, {
 | |
| 		desc:      "Multiple server pushback values - no retry",
 | |
| 		serverOps: []serverOp{sReq(1), sPushback("100"), sPushback("10"), sErr(codes.Unavailable)},
 | |
| 		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
 | |
| 	}, {
 | |
| 		desc:      "1s server pushback - delayed retry",
 | |
| 		serverOps: []serverOp{sReq(1), sPushback("1000"), sErr(codes.Unavailable), sReq(1), sRes(2)},
 | |
| 		clientOps: []clientOp{cGetTime(), cReq(1), cRes(2), cCheckElapsed(time.Second), cErr(codes.OK)},
 | |
| 	}, {
 | |
| 		desc:      "Overflowing buffer - no retry",
 | |
| 		serverOps: []serverOp{sReqPayload(largePayload), sErr(codes.Unavailable)},
 | |
| 		clientOps: []clientOp{cReqPayload(largePayload), cErr(codes.Unavailable)},
 | |
| 	}}
 | |
| 
 | |
| 	var serverOpIter int
 | |
| 	var serverOps []serverOp
 | |
| 	ss := &stubserver.StubServer{
 | |
| 		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			for serverOpIter < len(serverOps) {
 | |
| 				op := serverOps[serverOpIter]
 | |
| 				serverOpIter++
 | |
| 				if err := op(stream); err != nil {
 | |
| 					return err
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		},
 | |
| 	}
 | |
| 	if err := ss.Start([]grpc.ServerOption{}, grpc.WithDefaultCallOptions(grpc.MaxRetryRPCBufferSize(200)),
 | |
| 		grpc.WithDefaultServiceConfig(`{
 | |
|     "methodConfig": [{
 | |
|       "name": [{"service": "grpc.testing.TestService"}],
 | |
|       "waitForReady": true,
 | |
|       "retryPolicy": {
 | |
|           "MaxAttempts": 4,
 | |
|           "InitialBackoff": ".01s",
 | |
|           "MaxBackoff": ".01s",
 | |
|           "BackoffMultiplier": 1.0,
 | |
|           "RetryableStatusCodes": [ "UNAVAILABLE" ]
 | |
|       }
 | |
|     }]}`)); err != nil {
 | |
| 		t.Fatalf("Error starting endpoint server: %v", err)
 | |
| 	}
 | |
| 	defer ss.Stop()
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	for {
 | |
| 		if ctx.Err() != nil {
 | |
| 			t.Fatalf("Timed out waiting for service config update")
 | |
| 		}
 | |
| 		if ss.CC.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
 | |
| 			break
 | |
| 		}
 | |
| 		time.Sleep(time.Millisecond)
 | |
| 	}
 | |
| 
 | |
| 	for _, tc := range testCases {
 | |
| 		func() {
 | |
| 			serverOpIter = 0
 | |
| 			serverOps = tc.serverOps
 | |
| 
 | |
| 			stream, err := ss.Client.FullDuplexCall(ctx)
 | |
| 			if err != nil {
 | |
| 				t.Fatalf("%v: Error while creating stream: %v", tc.desc, err)
 | |
| 			}
 | |
| 			for _, op := range tc.clientOps {
 | |
| 				if err := op(stream); err != nil {
 | |
| 					t.Errorf("%v: %v", tc.desc, err)
 | |
| 					break
 | |
| 				}
 | |
| 			}
 | |
| 			if serverOpIter != len(serverOps) {
 | |
| 				t.Errorf("%v: serverOpIter = %v; want %v", tc.desc, serverOpIter, len(serverOps))
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type retryStatsHandler struct {
 | |
| 	mu sync.Mutex
 | |
| 	s  []stats.RPCStats
 | |
| }
 | |
| 
 | |
| func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
 | |
| 	return ctx
 | |
| }
 | |
| func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
 | |
| 	// these calls come in nondeterministically - so can just ignore
 | |
| 	if _, ok := s.(*stats.PickerUpdated); ok {
 | |
| 		return
 | |
| 	}
 | |
| 	h.mu.Lock()
 | |
| 	h.s = append(h.s, s)
 | |
| 	h.mu.Unlock()
 | |
| }
 | |
| func (*retryStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
 | |
| 	return ctx
 | |
| }
 | |
| func (*retryStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
 | |
| 
 | |
| func (s) TestRetryStats(t *testing.T) {
 | |
| 	lis, err := net.Listen("tcp", "localhost:0")
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Failed to listen. Err: %v", err)
 | |
| 	}
 | |
| 	defer lis.Close()
 | |
| 	server := &httpServer{
 | |
| 		waitForEndStream: true,
 | |
| 		responses: []httpServerResponse{{
 | |
| 			trailers: [][]string{{
 | |
| 				":status", "200",
 | |
| 				"content-type", "application/grpc",
 | |
| 				"grpc-status", "14", // UNAVAILABLE
 | |
| 				"grpc-message", "unavailable retry",
 | |
| 				"grpc-retry-pushback-ms", "10",
 | |
| 			}},
 | |
| 		}, {
 | |
| 			headers: [][]string{{
 | |
| 				":status", "200",
 | |
| 				"content-type", "application/grpc",
 | |
| 			}},
 | |
| 			payload: []byte{0, 0, 0, 0, 0}, // header for 0-byte response message.
 | |
| 			trailers: [][]string{{
 | |
| 				"grpc-status", "0", // OK
 | |
| 			}},
 | |
| 		}},
 | |
| 		refuseStream: func(i uint32) bool {
 | |
| 			return i == 1
 | |
| 		},
 | |
| 	}
 | |
| 	server.start(t, lis)
 | |
| 	handler := &retryStatsHandler{}
 | |
| 	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(handler),
 | |
| 		grpc.WithDefaultServiceConfig((`{
 | |
|     "methodConfig": [{
 | |
|       "name": [{"service": "grpc.testing.TestService"}],
 | |
|       "retryPolicy": {
 | |
|           "MaxAttempts": 4,
 | |
|           "InitialBackoff": ".01s",
 | |
|           "MaxBackoff": ".01s",
 | |
|           "BackoffMultiplier": 1.0,
 | |
|           "RetryableStatusCodes": [ "UNAVAILABLE" ]
 | |
|       }
 | |
|     }]}`)))
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("failed to dial due to err: %v", err)
 | |
| 	}
 | |
| 	defer cc.Close()
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	client := testgrpc.NewTestServiceClient(cc)
 | |
| 
 | |
| 	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
 | |
| 		t.Fatalf("unexpected EmptyCall error: %v", err)
 | |
| 	}
 | |
| 	handler.mu.Lock()
 | |
| 	want := []stats.RPCStats{
 | |
| 		&stats.Begin{},
 | |
| 		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
 | |
| 		&stats.OutPayload{WireLength: 5},
 | |
| 		&stats.End{},
 | |
| 
 | |
| 		&stats.Begin{IsTransparentRetryAttempt: true},
 | |
| 		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
 | |
| 		&stats.OutPayload{WireLength: 5},
 | |
| 		&stats.InTrailer{Trailer: metadata.Pairs("content-type", "application/grpc", "grpc-retry-pushback-ms", "10")},
 | |
| 		&stats.End{},
 | |
| 
 | |
| 		&stats.Begin{},
 | |
| 		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
 | |
| 		&stats.OutPayload{WireLength: 5},
 | |
| 		&stats.InHeader{},
 | |
| 		&stats.InPayload{WireLength: 5},
 | |
| 		&stats.InTrailer{},
 | |
| 		&stats.End{},
 | |
| 	}
 | |
| 
 | |
| 	toString := func(ss []stats.RPCStats) (ret []string) {
 | |
| 		for _, s := range ss {
 | |
| 			ret = append(ret, fmt.Sprintf("%T - %v", s, s))
 | |
| 		}
 | |
| 		return ret
 | |
| 	}
 | |
| 	t.Logf("Handler received frames:\n%v\n---\nwant:\n%v\n",
 | |
| 		strings.Join(toString(handler.s), "\n"),
 | |
| 		strings.Join(toString(want), "\n"))
 | |
| 
 | |
| 	if len(handler.s) != len(want) {
 | |
| 		t.Fatalf("received unexpected number of RPCStats: got %v; want %v", len(handler.s), len(want))
 | |
| 	}
 | |
| 
 | |
| 	// There is a race between receiving the payload (triggered by the
 | |
| 	// application / gRPC library) and receiving the trailer (triggered at the
 | |
| 	// transport layer).  Adjust the received stats accordingly if necessary.
 | |
| 	const tIdx, pIdx = 13, 14
 | |
| 	_, okT := handler.s[tIdx].(*stats.InTrailer)
 | |
| 	_, okP := handler.s[pIdx].(*stats.InPayload)
 | |
| 	if okT && okP {
 | |
| 		handler.s[pIdx], handler.s[tIdx] = handler.s[tIdx], handler.s[pIdx]
 | |
| 	}
 | |
| 
 | |
| 	for i := range handler.s {
 | |
| 		w, s := want[i], handler.s[i]
 | |
| 
 | |
| 		// Validate the event type
 | |
| 		if reflect.TypeOf(w) != reflect.TypeOf(s) {
 | |
| 			t.Fatalf("at position %v: got %T; want %T", i, s, w)
 | |
| 		}
 | |
| 		wv, sv := reflect.ValueOf(w).Elem(), reflect.ValueOf(s).Elem()
 | |
| 
 | |
| 		// Validate that Client is always true
 | |
| 		if sv.FieldByName("Client").Interface().(bool) != true {
 | |
| 			t.Fatalf("at position %v: got Client=false; want true", i)
 | |
| 		}
 | |
| 
 | |
| 		// Validate any set fields in want
 | |
| 		for i := 0; i < wv.NumField(); i++ {
 | |
| 			if !wv.Field(i).IsZero() {
 | |
| 				if got, want := sv.Field(i).Interface(), wv.Field(i).Interface(); !reflect.DeepEqual(got, want) {
 | |
| 					name := reflect.TypeOf(w).Elem().Field(i).Name
 | |
| 					t.Fatalf("at position %v, field %v: got %v; want %v", i, name, got, want)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Since the above only tests non-zero-value fields, test
 | |
| 		// IsTransparentRetryAttempt=false explicitly when needed.
 | |
| 		if wb, ok := w.(*stats.Begin); ok && !wb.IsTransparentRetryAttempt {
 | |
| 			if s.(*stats.Begin).IsTransparentRetryAttempt {
 | |
| 				t.Fatalf("at position %v: got IsTransparentRetryAttempt=true; want false", i)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Validate timings between last Begin and preceding End.
 | |
| 	end := handler.s[8].(*stats.End)
 | |
| 	begin := handler.s[9].(*stats.Begin)
 | |
| 	diff := begin.BeginTime.Sub(end.EndTime)
 | |
| 	if diff < 10*time.Millisecond || diff > 50*time.Millisecond {
 | |
| 		t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s) TestRetryTransparentWhenCommitted(t *testing.T) {
 | |
| 	// With MaxConcurrentStreams=1:
 | |
| 	//
 | |
| 	// 1. Create stream 1 that is retriable.
 | |
| 	// 2. Stream 1 is created and fails with a retriable code.
 | |
| 	// 3. Create dummy stream 2, blocking indefinitely.
 | |
| 	// 4. Stream 1 retries (and blocks until stream 2 finishes)
 | |
| 	// 5. Stream 1 is canceled manually.
 | |
| 	//
 | |
| 	// If there is no bug, the stream is done and errors with CANCELED.  With a bug:
 | |
| 	//
 | |
| 	// 6. Stream 1 has a nil stream (attempt.s).  Operations like CloseSend will panic.
 | |
| 
 | |
| 	first := grpcsync.NewEvent()
 | |
| 	ss := &stubserver.StubServer{
 | |
| 		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
 | |
| 			// signal?
 | |
| 			if !first.HasFired() {
 | |
| 				first.Fire()
 | |
| 				t.Log("returned first error")
 | |
| 				return status.Error(codes.AlreadyExists, "first attempt fails and is retriable")
 | |
| 			}
 | |
| 			t.Log("blocking")
 | |
| 			<-stream.Context().Done()
 | |
| 			return stream.Context().Err()
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)},
 | |
| 		grpc.WithDefaultServiceConfig(`{
 | |
|     "methodConfig": [{
 | |
|       "name": [{"service": "grpc.testing.TestService"}],
 | |
|       "waitForReady": true,
 | |
|       "retryPolicy": {
 | |
|         "MaxAttempts": 2,
 | |
|         "InitialBackoff": ".1s",
 | |
|         "MaxBackoff": ".1s",
 | |
|         "BackoffMultiplier": 1.0,
 | |
|         "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
 | |
|       }
 | |
|     }]}`)); err != nil {
 | |
| 		t.Fatalf("Error starting endpoint server: %v", err)
 | |
| 	}
 | |
| 	defer ss.Stop()
 | |
| 
 | |
| 	ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel1()
 | |
| 	ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel2()
 | |
| 
 | |
| 	stream1, err := ss.Client.FullDuplexCall(ctx1)
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Error creating stream 1: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// Create dummy stream to block indefinitely.
 | |
| 	_, err = ss.Client.FullDuplexCall(ctx2)
 | |
| 	if err != nil {
 | |
| 		t.Errorf("Error creating stream 2: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	stream1Closed := grpcsync.NewEvent()
 | |
| 	go func() {
 | |
| 		_, err := stream1.Recv()
 | |
| 		// Will trigger a retry when it sees the ALREADY_EXISTS error
 | |
| 		if status.Code(err) != codes.Canceled {
 | |
| 			t.Errorf("Expected stream1 to be canceled; got error: %v", err)
 | |
| 		}
 | |
| 		stream1Closed.Fire()
 | |
| 	}()
 | |
| 
 | |
| 	// Wait longer than the retry backoff timer.
 | |
| 	time.Sleep(200 * time.Millisecond)
 | |
| 	cancel1()
 | |
| 
 | |
| 	// Operations on the stream should not panic.
 | |
| 	<-stream1Closed.Done()
 | |
| 	stream1.CloseSend()
 | |
| 	stream1.Recv()
 | |
| 	stream1.Send(&testpb.StreamingOutputCallRequest{})
 | |
| }
 |