mirror of https://github.com/grpc/grpc-go.git
Merge pull request #830 from menghanl/non_blocking_rpc_reget_transport
Make non-failfast RPC get new transport instead of waiting
This commit is contained in:
commit
c2781963b3
|
@ -320,3 +320,57 @@ func TestGetOnWaitChannel(t *testing.T) {
|
|||
cc.Close()
|
||||
servers[0].stop()
|
||||
}
|
||||
|
||||
func TestOneConnectionDown(t *testing.T) {
|
||||
// Start 2 servers.
|
||||
numServers := 2
|
||||
servers, r := startServers(t, numServers, math.MaxUint32)
|
||||
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create ClientConn: %v", err)
|
||||
}
|
||||
// Add servers[1] to the service discovery.
|
||||
var updates []*naming.Update
|
||||
updates = append(updates, &naming.Update{
|
||||
Op: naming.Add,
|
||||
Addr: "127.0.0.1:" + servers[1].port,
|
||||
})
|
||||
r.w.inject(updates)
|
||||
req := "port"
|
||||
var reply string
|
||||
// Loop until servers[1] is up
|
||||
for {
|
||||
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
numRPC := 100
|
||||
sleepDuration := 10 * time.Millisecond
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
time.Sleep(sleepDuration)
|
||||
// After sleepDuration, kill server[0].
|
||||
servers[0].stop()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// All non-failfast RPCs should not block because there's at least one connection available.
|
||||
for i := 0; i < numRPC; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
time.Sleep(sleepDuration)
|
||||
// After sleepDuration, invoke RPC.
|
||||
// server[0] is killed around the same time to make it racey between balancer and gRPC internals.
|
||||
Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false))
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
cc.Close()
|
||||
for i := 0; i < numServers; i++ {
|
||||
servers[i].stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,6 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/transport"
|
||||
|
@ -502,7 +501,11 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|||
}
|
||||
return nil, nil, errConnClosing
|
||||
}
|
||||
t, err := ac.wait(ctx, !opts.BlockingWait)
|
||||
// ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
|
||||
// - If RPC is failfast, ac.wait should not block.
|
||||
// - If balancer is not nil, ac.wait should return errConnClosing on transient failure
|
||||
// so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
|
||||
t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
|
||||
if err != nil {
|
||||
if put != nil {
|
||||
put()
|
||||
|
@ -754,8 +757,8 @@ func (ac *addrConn) transportMonitor() {
|
|||
}
|
||||
|
||||
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
||||
// iv) transport is in TransientFailure and the RPC is fail-fast.
|
||||
func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) {
|
||||
// iv) transport is in TransientFailure and blocking is false.
|
||||
func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
|
||||
for {
|
||||
ac.mu.Lock()
|
||||
switch {
|
||||
|
@ -767,9 +770,9 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr
|
|||
ct := ac.transport
|
||||
ac.mu.Unlock()
|
||||
return ct, nil
|
||||
case ac.state == TransientFailure && failFast:
|
||||
case ac.state == TransientFailure && !blocking:
|
||||
ac.mu.Unlock()
|
||||
return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
|
||||
return nil, errConnClosing
|
||||
default:
|
||||
ready := ac.ready
|
||||
if ready == nil {
|
||||
|
|
Loading…
Reference in New Issue