From 29a7ac4deb3705f2600e55ea6a410ac2de1612dc Mon Sep 17 00:00:00 2001 From: Can Guler Date: Thu, 13 Dec 2018 15:15:11 -0800 Subject: [PATCH] client: deprecates FailFast & replaces its use by WaitForReady. --- balancer/grpclb/grpclb_remote_balancer.go | 2 +- balancer/grpclb/grpclb_test.go | 26 +++--- balancer/roundrobin/roundrobin_test.go | 2 +- balancer_test.go | 26 +++--- call_test.go | 2 +- .../alts/internal/handshaker/handshaker.go | 4 +- reflection/serverreflection_test.go | 2 +- rpc_util.go | 13 ++- stats/stats_test.go | 8 +- stress/client/main.go | 22 ++--- test/end2end_test.go | 86 +++++++++---------- test/healthcheck_test.go | 4 +- 12 files changed, 102 insertions(+), 95 deletions(-) diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index c1cb0e128..cb301d238 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -192,7 +192,7 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) { lbClient := &loadBalancerClient{cc: lb.ccRemoteLB} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false)) + stream, err := lbClient.BalanceLoad(ctx, grpc.WaitForReady(true)) if err != nil { return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) } diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 39285cd5a..460c852e6 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -398,7 +398,7 @@ func TestGRPCLB(t *testing.T) { ServerName: lbServerName, }}) - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } } @@ -462,7 +462,7 @@ func TestGRPCLBWeighted(t *testing.T) { tss.ls.sls <- &lbpb.ServerList{Servers: bes} for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) @@ -519,7 +519,7 @@ func TestDropRequest(t *testing.T) { // to true. var i int for i = 0; i < 1000; i++ { - if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err == nil { break } time.Sleep(time.Millisecond) @@ -536,12 +536,12 @@ func TestDropRequest(t *testing.T) { for i := 0; i < 3; i++ { // Even RPCs should fail, because the 2st backend has // DropForLoadBalancing set to true. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(failfast)); status.Code(err) != codes.Unavailable { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable) } // Odd RPCs should succeed since they choose the non-drop-request // backend according to the round robin policy. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(failfast)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } } @@ -606,7 +606,7 @@ func TestBalancerDisconnects(t *testing.T) { }}) var p peer.Peer - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port != tests[0].bePorts[0] { @@ -617,7 +617,7 @@ func TestBalancerDisconnects(t *testing.T) { // Stop balancer[0], balancer[1] should be used by grpclb. // Check peer address to see if that happened. for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tests[1].bePorts[0] { @@ -687,7 +687,7 @@ func TestFallback(t *testing.T) { }}) var p peer.Peer - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.String() != beLis.Addr().String() { @@ -705,7 +705,7 @@ func TestFallback(t *testing.T) { }}) for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { @@ -792,7 +792,7 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) { stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } for i := 0; i < countRPC-1; i++ { @@ -842,7 +842,7 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) { stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } for i := 0; i < countRPC-1; i++ { @@ -865,7 +865,7 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) { stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. - stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false)) + stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, ", testC, err) } @@ -929,7 +929,7 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) { stats := runAndGetStats(t, false, func(cc *grpc.ClientConn) { testC := testpb.NewTestServiceClient(cc) // The first non-failfast RPC succeeds, all connections are up. - stream, err := testC.FullDuplexCall(context.Background(), grpc.FailFast(false)) + stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, ", testC, err) } diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index f84c30681..d56baef12 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -213,7 +213,7 @@ func TestAddressesRemoved(t *testing.T) { for i := 0; i < 1000; i++ { ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { return } time.Sleep(time.Millisecond) diff --git a/balancer_test.go b/balancer_test.go index 219dcacac..cdf43afb1 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -246,7 +246,7 @@ func TestCloseWithPendingRPC(t *testing.T) { } defer cc.Close() var reply string - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) } // Remove the server. @@ -258,7 +258,7 @@ func TestCloseWithPendingRPC(t *testing.T) { // Loop until the above update applies. for { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded { + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { cancel() break } @@ -271,7 +271,7 @@ func TestCloseWithPendingRPC(t *testing.T) { go func() { defer wg.Done() var reply string - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) } }() @@ -279,7 +279,7 @@ func TestCloseWithPendingRPC(t *testing.T) { defer wg.Done() var reply string time.Sleep(5 * time.Millisecond) - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) } }() @@ -306,7 +306,7 @@ func TestGetOnWaitChannel(t *testing.T) { for { var reply string ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded { + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { cancel() break } @@ -318,7 +318,7 @@ func TestGetOnWaitChannel(t *testing.T) { go func() { defer wg.Done() var reply string - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) } }() @@ -378,7 +378,7 @@ func TestOneServerDown(t *testing.T) { time.Sleep(sleepDuration) // After sleepDuration, invoke RPC. // server[0] is killed around the same time to make it racy between balancer and gRPC internals. - cc.Invoke(context.Background(), "/foo/bar", &req, &reply, FailFast(false)) + cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true)) wg.Done() }() } @@ -437,7 +437,7 @@ func TestOneAddressRemoval(t *testing.T) { time.Sleep(sleepDuration) // After sleepDuration, invoke RPC. // server[0] is removed around the same time to make it racy between balancer and gRPC internals. - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err) } wg.Done() @@ -505,7 +505,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) { } defer cc.Close() var reply string - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) } // Remove the server. @@ -517,7 +517,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) { // Loop until the above update applies. for { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded { + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { cancel() break } @@ -530,7 +530,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) { go func() { defer wg.Done() var reply string - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) } }() @@ -538,7 +538,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) { defer wg.Done() var reply string time.Sleep(5 * time.Millisecond) - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) } }() @@ -798,7 +798,7 @@ func TestPickFirstOneAddressRemoval(t *testing.T) { time.Sleep(sleepDuration) // After sleepDuration, invoke RPC. // server[0] is removed around the same time to make it racy between balancer and gRPC internals. - if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil { + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err) } wg.Done() diff --git a/call_test.go b/call_test.go index 7800c5d84..0697f913a 100644 --- a/call_test.go +++ b/call_test.go @@ -286,7 +286,7 @@ func TestInvokeCancelClosedNonFailFast(t *testing.T) { req := "hello" ctx, cancel := context.WithCancel(context.Background()) cancel() - if err := cc.Invoke(ctx, "/foo/bar", &req, &reply, FailFast(false)); err == nil { + if err := cc.Invoke(ctx, "/foo/bar", &req, &reply, WaitForReady(true)); err == nil { t.Fatalf("canceled invoke on closed connection should fail") } server.stop() diff --git a/credentials/alts/internal/handshaker/handshaker.go b/credentials/alts/internal/handshaker/handshaker.go index 82a1c9c58..49c22c1e8 100644 --- a/credentials/alts/internal/handshaker/handshaker.go +++ b/credentials/alts/internal/handshaker/handshaker.go @@ -151,7 +151,7 @@ type altsHandshaker struct { // stub created using the passed conn and used to talk to the ALTS Handshaker // service in the metadata server. func NewClientHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn, opts *ClientHandshakerOptions) (core.Handshaker, error) { - stream, err := altsgrpc.NewHandshakerServiceClient(conn).DoHandshake(ctx, grpc.FailFast(false)) + stream, err := altsgrpc.NewHandshakerServiceClient(conn).DoHandshake(ctx, grpc.WaitForReady(true)) if err != nil { return nil, err } @@ -167,7 +167,7 @@ func NewClientHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn, // stub created using the passed conn and used to talk to the ALTS Handshaker // service in the metadata server. func NewServerHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn, opts *ServerHandshakerOptions) (core.Handshaker, error) { - stream, err := altsgrpc.NewHandshakerServiceClient(conn).DoHandshake(ctx, grpc.FailFast(false)) + stream, err := altsgrpc.NewHandshakerServiceClient(conn).DoHandshake(ctx, grpc.WaitForReady(true)) if err != nil { return nil, err } diff --git a/reflection/serverreflection_test.go b/reflection/serverreflection_test.go index 9078b2055..ed31780ed 100644 --- a/reflection/serverreflection_test.go +++ b/reflection/serverreflection_test.go @@ -203,7 +203,7 @@ func TestReflectionEnd2end(t *testing.T) { defer conn.Close() c := rpb.NewServerReflectionClient(conn) - stream, err := c.ServerReflectionInfo(context.Background(), grpc.FailFast(false)) + stream, err := c.ServerReflectionInfo(context.Background(), grpc.WaitForReady(true)) if err != nil { t.Fatalf("cannot get ServerReflectionInfo: %v", err) } diff --git a/rpc_util.go b/rpc_util.go index 86f00e5a2..d7cf89f1b 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -253,8 +253,8 @@ func (o PeerCallOption) after(c *callInfo) { } } -// FailFast configures the action to take when an RPC is attempted on broken -// connections or unreachable servers. If failFast is true, the RPC will fail +// WaitForReady configures the action to take when an RPC is attempted on broken +// connections or unreachable servers. If waitForReady is false, the RPC will fail // immediately. Otherwise, the RPC client will block the call until a // connection is available (or the call is canceled or times out) and will // retry the call if it fails due to a transient error. gRPC will not retry if @@ -262,7 +262,14 @@ func (o PeerCallOption) after(c *callInfo) { // the data. Please refer to // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md. // -// By default, RPCs are "Fail Fast". +// By default, RPCs don't "wait for ready". +func WaitForReady(waitForReady bool) CallOption { + return FailFastCallOption{FailFast: !waitForReady} +} + +// FailFast is the opposite of WaitForReady. +// +// Deprecated: use WaitForReady. func FailFast(failFast bool) CallOption { return FailFastCallOption{FailFast: failFast} } diff --git a/stats/stats_test.go b/stats/stats_test.go index 008d684c1..b78c99132 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -274,7 +274,7 @@ func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.Simple } ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) - resp, err = tc.UnaryCall(ctx, req, grpc.FailFast(c.failfast)) + resp, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(!c.failfast)) return req, resp, err } @@ -285,7 +285,7 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest err error ) tc := testpb.NewTestServiceClient(te.clientConn()) - stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.FailFast(c.failfast)) + stream, err := tc.FullDuplexCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast)) if err != nil { return reqs, resps, err } @@ -324,7 +324,7 @@ func (te *test) doClientStreamCall(c *rpcConfig) ([]*testpb.SimpleRequest, *test err error ) tc := testpb.NewTestServiceClient(te.clientConn()) - stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.FailFast(c.failfast)) + stream, err := tc.ClientStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), grpc.WaitForReady(!c.failfast)) if err != nil { return reqs, resp, err } @@ -359,7 +359,7 @@ func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.SimpleRequest, []*test startID = errorID } req = &testpb.SimpleRequest{Id: startID} - stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.FailFast(c.failfast)) + stream, err := tc.ServerStreamCall(metadata.NewOutgoingContext(context.Background(), testMetadata), req, grpc.WaitForReady(!c.failfast)) if err != nil { return req, resps, err } diff --git a/stress/client/main.go b/stress/client/main.go index 82febbc24..d9115c943 100644 --- a/stress/client/main.go +++ b/stress/client/main.go @@ -215,27 +215,27 @@ func performRPCs(gauge *gauge, conn *grpc.ClientConn, selector *weightedRandomTe test := selector.getNextTest() switch test { case "empty_unary": - interop.DoEmptyUnaryCall(client, grpc.FailFast(false)) + interop.DoEmptyUnaryCall(client, grpc.WaitForReady(true)) case "large_unary": - interop.DoLargeUnaryCall(client, grpc.FailFast(false)) + interop.DoLargeUnaryCall(client, grpc.WaitForReady(true)) case "client_streaming": - interop.DoClientStreaming(client, grpc.FailFast(false)) + interop.DoClientStreaming(client, grpc.WaitForReady(true)) case "server_streaming": - interop.DoServerStreaming(client, grpc.FailFast(false)) + interop.DoServerStreaming(client, grpc.WaitForReady(true)) case "ping_pong": - interop.DoPingPong(client, grpc.FailFast(false)) + interop.DoPingPong(client, grpc.WaitForReady(true)) case "empty_stream": - interop.DoEmptyStream(client, grpc.FailFast(false)) + interop.DoEmptyStream(client, grpc.WaitForReady(true)) case "timeout_on_sleeping_server": - interop.DoTimeoutOnSleepingServer(client, grpc.FailFast(false)) + interop.DoTimeoutOnSleepingServer(client, grpc.WaitForReady(true)) case "cancel_after_begin": - interop.DoCancelAfterBegin(client, grpc.FailFast(false)) + interop.DoCancelAfterBegin(client, grpc.WaitForReady(true)) case "cancel_after_first_response": - interop.DoCancelAfterFirstResponse(client, grpc.FailFast(false)) + interop.DoCancelAfterFirstResponse(client, grpc.WaitForReady(true)) case "status_code_and_message": - interop.DoStatusCodeAndMessage(client, grpc.FailFast(false)) + interop.DoStatusCodeAndMessage(client, grpc.WaitForReady(true)) case "custom_metadata": - interop.DoCustomMetadata(client, grpc.FailFast(false)) + interop.DoCustomMetadata(client, grpc.WaitForReady(true)) } numCalls++ gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds())) diff --git a/test/end2end_test.go b/test/end2end_test.go index 8fdca906a..c568f347e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -972,7 +972,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } te.srv.Stop() @@ -987,7 +987,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { t.Fatalf("Timed out waiting for non-ready state") } ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond) - _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)) + _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) cancel() if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded { // If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error, @@ -1037,7 +1037,7 @@ func testServerGoAway(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) // Finish an RPC to make sure the connection is good. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } ch := make(chan struct{}) @@ -1086,12 +1086,12 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } // Finish an RPC to make sure the connection is good. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch := make(chan struct{}) @@ -1104,7 +1104,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) { errored := false for time.Since(start) < time.Second { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)) + _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) cancel() if err != nil { errored = true @@ -1161,12 +1161,12 @@ func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) ctx, cancel := context.WithCancel(context.Background()) - stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } // Finish an RPC to make sure the connection is good. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch1 := make(chan struct{}) @@ -1182,7 +1182,7 @@ func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { // Loop until the server side GoAway signal is propagated to the client. for { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { cancel() break } @@ -1248,7 +1248,7 @@ func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch := make(chan struct{}) @@ -1286,12 +1286,12 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)) + stream, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } // Finish an RPC to make sure the connection is good. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch := make(chan struct{}) @@ -1302,7 +1302,7 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) { // Loop until the server side GoAway signal is propagated to the client. for { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { cancel() break } @@ -1582,10 +1582,10 @@ func TestServiceConfigWaitForReady(t *testing.T) { // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. var err error - if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } @@ -1668,13 +1668,13 @@ func TestServiceConfigTimeout(t *testing.T) { // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. var err error ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) - if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() @@ -1709,13 +1709,13 @@ func TestServiceConfigTimeout(t *testing.T) { } ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() @@ -1790,7 +1790,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { } // Test for unary RPC recv. - if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1859,7 +1859,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || status.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1921,7 +1921,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err != nil { + if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) } @@ -2022,7 +2022,7 @@ func TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want ", err) } @@ -2951,7 +2951,7 @@ func testPeerClientSide(t *testing.T, e env) { defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) peer := new(peer.Peer) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } pa := peer.Addr.String() @@ -3128,7 +3128,7 @@ func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) { } var trailer metadata.MD ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) - if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil { + if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, ", ctx, err) } expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) @@ -3151,7 +3151,7 @@ func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) { tc := testpb.NewTestServiceClient(te.clientConn()) ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) - stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } @@ -3202,7 +3202,7 @@ func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) { } var header metadata.MD ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) - if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil { + if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, ", ctx, err) } delete(header, "user-agent") @@ -3247,7 +3247,7 @@ func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) { var header metadata.MD ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) - if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil { + if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil { t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, ", ctx, err) } delete(header, "user-agent") @@ -3291,7 +3291,7 @@ func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) { } var header metadata.MD ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) - if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err == nil { + if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil { t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, ", ctx, err) } delete(header, "user-agent") @@ -3560,7 +3560,7 @@ func testTransparentRetry(t *testing.T, e env) { successAttempt = tc.successAttempt ctx, cancel := context.WithTimeout(context.Background(), time.Second) - _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(tc.failFast)) + _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!tc.failFast)) cancel() if status.Code(err) != tc.errCode { t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode) @@ -3689,7 +3689,7 @@ func testNoService(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - stream, err := tc.FullDuplexCall(te.ctx, grpc.FailFast(false)) + stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } @@ -4216,7 +4216,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) { // No rpc should go through due to the max streams limit. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() - if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } }() @@ -4789,7 +4789,7 @@ func TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) // This unary call should succeed, because ClientHandshake will succeed for the second time. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want ", err) } } @@ -4952,7 +4952,7 @@ func TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) { // This unary call should fail, but not timeout. ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(true)); status.Code(err) != codes.Unavailable { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(false)); status.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want ", err) } } @@ -5967,10 +5967,10 @@ func testServiceConfigWaitForReadyTD(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } @@ -6031,12 +6031,12 @@ func testServiceConfigTimeoutTD(t *testing.T, e env) { tc := testpb.NewTestServiceClient(cc) // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() @@ -6064,13 +6064,13 @@ func testServiceConfigTimeoutTD(t *testing.T, e env) { } ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() @@ -6368,7 +6368,7 @@ func TestInterceptorCanAccessCallOptions(t *testing.T) { } defaults := []grpc.CallOption{ - grpc.FailFast(false), + grpc.WaitForReady(true), grpc.MaxCallRecvMsgSize(1010), } tc := testpb.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...))) @@ -6400,7 +6400,7 @@ func TestInterceptorCanAccessCallOptions(t *testing.T) { observedOpts = observedOptions{} // reset tc.StreamingInputCall(context.Background(), - grpc.FailFast(true), + grpc.WaitForReady(false), grpc.MaxCallSendMsgSize(2020), grpc.UseCompressor("comp-type"), grpc.CallContentSubtype("json")) @@ -6787,7 +6787,7 @@ func TestDisabledIOBuffers(t *testing.T) { c := testpb.NewTestServiceClient(cc) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - stream, err := c.FullDuplexCall(ctx, grpc.FailFast(false)) + stream, err := c.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("Failed to send test RPC to server") } diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 016ab4a62..baf535d74 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -296,7 +296,7 @@ func TestHealthCheckWithGoAway(t *testing.T) { // the stream rpc will persist through goaway event. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } @@ -456,7 +456,7 @@ func TestHealthCheckWithAddrConnDrain(t *testing.T) { // the stream rpc will persist through goaway event. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) }