client: deprecates FailFast & replaces its use by WaitForReady.

This commit is contained in:
Can Guler 2018-12-13 15:15:11 -08:00 committed by GitHub
parent 39333409e4
commit 29a7ac4deb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 102 additions and 95 deletions

View File

@ -192,7 +192,7 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false))
stream, err := lbClient.BalanceLoad(ctx, grpc.WaitForReady(true))
if err != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}

View File

@ -398,7 +398,7 @@ func TestGRPCLB(t *testing.T) {
ServerName: lbServerName,
}})
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
}
@ -462,7 +462,7 @@ func TestGRPCLBWeighted(t *testing.T) {
tss.ls.sls <- &lbpb.ServerList{Servers: bes}
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port])
@ -519,7 +519,7 @@ func TestDropRequest(t *testing.T) {
// to true.
var i int
for i = 0; i < 1000; i++ {
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err == nil {
break
}
time.Sleep(time.Millisecond)
@ -536,12 +536,12 @@ func TestDropRequest(t *testing.T) {
for i := 0; i < 3; i++ {
// Even RPCs should fail, because the 2st backend has
// DropForLoadBalancing set to true.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(failfast)); status.Code(err) != codes.Unavailable {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
}
// Odd RPCs should succeed since they choose the non-drop-request
// backend according to the round robin policy.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(failfast)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
}
@ -606,7 +606,7 @@ func TestBalancerDisconnects(t *testing.T) {
}})
var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port != tests[0].bePorts[0] {
@ -617,7 +617,7 @@ func TestBalancerDisconnects(t *testing.T) {
// Stop balancer[0], balancer[1] should be used by grpclb.
// Check peer address to see if that happened.
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tests[1].bePorts[0] {
@ -687,7 +687,7 @@ func TestFallback(t *testing.T) {
}})
var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.String() != beLis.Addr().String() {
@ -705,7 +705,7 @@ func TestFallback(t *testing.T) {
}})
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
@ -792,7 +792,7 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) {
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
for i := 0; i < countRPC-1; i++ {
@ -842,7 +842,7 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
for i := 0; i < countRPC-1; i++ {
@ -865,7 +865,7 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false))
stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
}
@ -929,7 +929,7 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) {
testC := testpb.NewTestServiceClient(cc)
// The first non-failfast RPC succeeds, all connections are up.
stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false))
stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
}

View File

@ -213,7 +213,7 @@ func TestAddressesRemoved(t *testing.T) {
for i := 0; i < 1000; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
return
}
time.Sleep(time.Millisecond)

View File

@ -246,7 +246,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Remove the server.
@ -258,7 +258,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
// Loop until the above update applies.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
@ -271,7 +271,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
@ -279,7 +279,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
defer wg.Done()
var reply string
time.Sleep(5 * time.Millisecond)
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
@ -306,7 +306,7 @@ func TestGetOnWaitChannel(t *testing.T) {
for {
var reply string
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
@ -318,7 +318,7 @@ func TestGetOnWaitChannel(t *testing.T) {
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
}
}()
@ -378,7 +378,7 @@ func TestOneServerDown(t *testing.T) {
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is killed around the same time to make it racy between balancer and gRPC internals.
cc.Invoke(context.Background(), "/foo/bar", &req, &reply, FailFast(false))
cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true))
wg.Done()
}()
}
@ -437,7 +437,7 @@ func TestOneAddressRemoval(t *testing.T) {
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
}
wg.Done()
@ -505,7 +505,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) {
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Remove the server.
@ -517,7 +517,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) {
// Loop until the above update applies.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
@ -530,7 +530,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) {
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
@ -538,7 +538,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) {
defer wg.Done()
var reply string
time.Sleep(5 * time.Millisecond)
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
@ -798,7 +798,7 @@ func TestPickFirstOneAddressRemoval(t *testing.T) {
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
}
wg.Done()

View File

@ -286,7 +286,7 @@ func TestInvokeCancelClosedNonFailFast(t *testing.T) {
req := "hello"
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := cc.Invoke(ctx, "/foo/bar", &req, &reply, FailFast(false)); err == nil {
if err := cc.Invoke(ctx, "/foo/bar", &req, &reply, WaitForReady(true)); err == nil {
t.Fatalf("canceled invoke on closed connection should fail")
}
server.stop()

View File

@ -151,7 +151,7 @@ type altsHandshaker struct {
// stub created using the passed conn and used to talk to the ALTS Handshaker
// service in the metadata server.
func NewClientHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn, opts *ClientHandshakerOptions) (core.Handshaker, error) {
stream, err := altsgrpc.NewHandshakerServiceClient(conn).DoHandshake(ctx, grpc.FailFast(false))
stream, err := altsgrpc.NewHandshakerServiceClient(conn).DoHandshake(ctx, grpc.WaitForReady(true))
if err != nil {
return nil, err
}
@ -167,7 +167,7 @@ func NewClientHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn,
// stub created using the passed conn and used to talk to the ALTS Handshaker
// service in the metadata server.
func NewServerHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn, opts *ServerHandshakerOptions) (core.Handshaker, error) {
stream, err := altsgrpc.NewHandshakerServiceClient(conn).DoHandshake(ctx, grpc.FailFast(false))
stream, err := altsgrpc.NewHandshakerServiceClient(conn).DoHandshake(ctx, grpc.WaitForReady(true))
if err != nil {
return nil, err
}

View File

@ -203,7 +203,7 @@ func TestReflectionEnd2end(t *testing.T) {
defer conn.Close()
c := rpb.NewServerReflectionClient(conn)
stream, err := c.ServerReflectionInfo(context.Background(), grpc.FailFast(false))
stream, err := c.ServerReflectionInfo(context.Background(), grpc.WaitForReady(true))
if err != nil {
t.Fatalf("cannot get ServerReflectionInfo: %v", err)
}

View File

@ -253,8 +253,8 @@ func (o PeerCallOption) after(c *callInfo) {
}
}
// FailFast configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If failFast is true, the RPC will fail
// WaitForReady configures the action to take when an RPC is attempted on broken
// connections or unreachable servers. If waitForReady is false, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
// connection is available (or the call is canceled or times out) and will
// retry the call if it fails due to a transient error. gRPC will not retry if
@ -262,7 +262,14 @@ func (o PeerCallOption) after(c *callInfo) {
// the data. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
//
// By default, RPCs are "Fail Fast".
// By default, RPCs don't "wait for ready".
func WaitForReady(waitForReady bool) CallOption {
return FailFastCallOption{FailFast: !waitForReady}
}
// FailFast is the opposite of WaitForReady.
//
// Deprecated: use WaitForReady.
func FailFast(failFast bool) CallOption {
return FailFastCallOption{FailFast: failFast}
}

View File

@ -274,7 +274,7 @@ func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.Simple
}
ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
resp, err = tc.UnaryCall(ctx, req, grpc.FailFast(c.failfast))
resp, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(!c.failfast))
return req, resp, err
}
@ -285,7 +285,7 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest
err error
)
tc := testpb.NewTestServiceClient(te.clientConn())
stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast))
if err != nil {
return reqs, resps, err
}
@ -324,7 +324,7 @@ func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *test
err error
)
tc := testpb.NewTestServiceClient(te.clientConn())
stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast))
if err != nil {
return reqs, resp, err
}
@ -359,7 +359,7 @@ func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*test
startID = errorID
}
req = &testpb.SimpleRequest{Id: startID}
stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.FailFast(c.failfast))
stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.WaitForReady(!c.failfast))
if err != nil {
return req, resps, err
}

View File

@ -215,27 +215,27 @@ func performRPCs(gauge *gauge, conn *grpc.ClientConn, selector *weightedRandomTe
test := selector.getNextTest()
switch test {
case "empty_unary":
interop.DoEmptyUnaryCall(client, grpc.FailFast(false))
interop.DoEmptyUnaryCall(client, grpc.WaitForReady(true))
case "large_unary":
interop.DoLargeUnaryCall(client, grpc.FailFast(false))
interop.DoLargeUnaryCall(client, grpc.WaitForReady(true))
case "client_streaming":
interop.DoClientStreaming(client, grpc.FailFast(false))
interop.DoClientStreaming(client, grpc.WaitForReady(true))
case "server_streaming":
interop.DoServerStreaming(client, grpc.FailFast(false))
interop.DoServerStreaming(client, grpc.WaitForReady(true))
case "ping_pong":
interop.DoPingPong(client, grpc.FailFast(false))
interop.DoPingPong(client, grpc.WaitForReady(true))
case "empty_stream":
interop.DoEmptyStream(client, grpc.FailFast(false))
interop.DoEmptyStream(client, grpc.WaitForReady(true))
case "timeout_on_sleeping_server":
interop.DoTimeoutOnSleepingServer(client, grpc.FailFast(false))
interop.DoTimeoutOnSleepingServer(client, grpc.WaitForReady(true))
case "cancel_after_begin":
interop.DoCancelAfterBegin(client, grpc.FailFast(false))
interop.DoCancelAfterBegin(client, grpc.WaitForReady(true))
case "cancel_after_first_response":
interop.DoCancelAfterFirstResponse(client, grpc.FailFast(false))
interop.DoCancelAfterFirstResponse(client, grpc.WaitForReady(true))
case "status_code_and_message":
interop.DoStatusCodeAndMessage(client, grpc.FailFast(false))
interop.DoStatusCodeAndMessage(client, grpc.WaitForReady(true))
case "custom_metadata":
interop.DoCustomMetadata(client, grpc.FailFast(false))
interop.DoCustomMetadata(client, grpc.WaitForReady(true))
}
numCalls++
gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds()))

View File

@ -972,7 +972,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
te.srv.Stop()
@ -987,7 +987,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
t.Fatalf("Timed out waiting for non-ready state")
}
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false))
_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
cancel()
if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded {
// If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error,
@ -1037,7 +1037,7 @@ func testServerGoAway(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
ch := make(chan struct{})
@ -1086,12 +1086,12 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch := make(chan struct{})
@ -1104,7 +1104,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
errored := false
for time.Since(start) < time.Second {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false))
_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
cancel()
if err != nil {
errored = true
@ -1161,12 +1161,12 @@ func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithCancel(context.Background())
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch1 := make(chan struct{})
@ -1182,7 +1182,7 @@ func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
// Loop until the server side GoAway signal is propagated to the client.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
cancel()
break
}
@ -1248,7 +1248,7 @@ func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch := make(chan struct{})
@ -1286,12 +1286,12 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false))
stream, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch := make(chan struct{})
@ -1302,7 +1302,7 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
// Loop until the server side GoAway signal is propagated to the client.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
cancel()
break
}
@ -1582,10 +1582,10 @@ func TestServiceConfigWaitForReady(t *testing.T) {
// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
var err error
if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
@ -1668,13 +1668,13 @@ func TestServiceConfigTimeout(t *testing.T) {
// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
cancel()
@ -1709,13 +1709,13 @@ func TestServiceConfigTimeout(t *testing.T) {
}
ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
cancel()
@ -1790,7 +1790,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) {
}
// Test for unary RPC recv.
if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || status.Code(err) != codes.ResourceExhausted {
if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
@ -1859,7 +1859,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) {
req.Payload = smallPayload
req.ResponseSize = int32(largeSize)
if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || status.Code(err) != codes.ResourceExhausted {
if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
}
@ -1921,7 +1921,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) {
req.Payload = smallPayload
req.ResponseSize = int32(largeSize)
if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err != nil {
if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err != nil {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
}
@ -2022,7 +2022,7 @@ func TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
}
@ -2951,7 +2951,7 @@ func testPeerClientSide(t *testing.T, e env) {
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
peer := new(peer.Peer)
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
pa := peer.Addr.String()
@ -3128,7 +3128,7 @@ func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
}
var trailer metadata.MD
ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil {
if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
}
expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
@ -3151,7 +3151,7 @@ func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
tc := testpb.NewTestServiceClient(te.clientConn())
ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
@ -3202,7 +3202,7 @@ func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
}
var header metadata.MD
ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
}
delete(header, "user-agent")
@ -3247,7 +3247,7 @@ func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
var header metadata.MD
ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
}
delete(header, "user-agent")
@ -3291,7 +3291,7 @@ func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
}
var header metadata.MD
ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err == nil {
if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil {
t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
}
delete(header, "user-agent")
@ -3560,7 +3560,7 @@ func testTransparentRetry(t *testing.T, e env) {
successAttempt = tc.successAttempt
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(tc.failFast))
_, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!tc.failFast))
cancel()
if status.Code(err) != tc.errCode {
t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode)
@ -3689,7 +3689,7 @@ func testNoService(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
stream, err := tc.FullDuplexCall(te.ctx, grpc.FailFast(false))
stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
@ -4216,7 +4216,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) {
// No rpc should go through due to the max streams limit.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
}()
@ -4789,7 +4789,7 @@ func TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// This unary call should succeed, because ClientHandshake will succeed for the second time.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <nil>", err)
}
}
@ -4952,7 +4952,7 @@ func TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) {
// This unary call should fail, but not timeout.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(true)); status.Code(err) != codes.Unavailable {
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(false)); status.Code(err) != codes.Unavailable {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err)
}
}
@ -5967,10 +5967,10 @@ func testServiceConfigWaitForReadyTD(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
@ -6031,12 +6031,12 @@ func testServiceConfigTimeoutTD(t *testing.T, e env) {
tc := testpb.NewTestServiceClient(cc)
// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
cancel()
@ -6064,13 +6064,13 @@ func testServiceConfigTimeoutTD(t *testing.T, e env) {
}
ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
}
cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
}
cancel()
@ -6368,7 +6368,7 @@ func TestInterceptorCanAccessCallOptions(t *testing.T) {
}
defaults := []grpc.CallOption{
grpc.FailFast(false),
grpc.WaitForReady(true),
grpc.MaxCallRecvMsgSize(1010),
}
tc := testpb.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...)))
@ -6400,7 +6400,7 @@ func TestInterceptorCanAccessCallOptions(t *testing.T) {
observedOpts = observedOptions{} // reset
tc.StreamingInputCall(context.Background(),
grpc.FailFast(true),
grpc.WaitForReady(false),
grpc.MaxCallSendMsgSize(2020),
grpc.UseCompressor("comp-type"),
grpc.CallContentSubtype("json"))
@ -6787,7 +6787,7 @@ func TestDisabledIOBuffers(t *testing.T) {
c := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := c.FullDuplexCall(ctx, grpc.FailFast(false))
stream, err := c.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("Failed to send test RPC to server")
}

View File

@ -296,7 +296,7 @@ func TestHealthCheckWithGoAway(t *testing.T) {
// the stream rpc will persist through goaway event.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
@ -456,7 +456,7 @@ func TestHealthCheckWithAddrConnDrain(t *testing.T) {
// the stream rpc will persist through goaway event.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}