mirror of https://github.com/grpc/grpc-go.git
internal: fix GO_AWAY deadlock (#2391)
internal: fix GO_AWAY deadlock A deadlock can occur when a GO_AWAY is followed by a connection closure. This happens because onClose needlessly closes the current ac.transport: if a GO_AWAY already occured, and the transport was already reset, then the later closure (of the original address) sets ac.transport - which is now healthy - to nil. The manifestation of this problem is that picker_wrapper spins forever trying to use a READY connection whose ac.transport is nil.
This commit is contained in:
parent
39444b99c0
commit
ff2aa05958
|
|
@ -1066,9 +1066,6 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
|
||||||
case <-skipReset: // The outer resetTransport loop will handle reconnection.
|
case <-skipReset: // The outer resetTransport loop will handle reconnection.
|
||||||
return
|
return
|
||||||
case <-allowedToReset: // We're in the clear to reset.
|
case <-allowedToReset: // We're in the clear to reset.
|
||||||
ac.mu.Lock()
|
|
||||||
ac.transport = nil
|
|
||||||
ac.mu.Unlock()
|
|
||||||
oneReset.Do(func() { ac.resetTransport(false) })
|
oneReset.Do(func() { ac.resetTransport(false) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -57,6 +57,7 @@ import (
|
||||||
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
|
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
|
||||||
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||||
"google.golang.org/grpc/internal/channelz"
|
"google.golang.org/grpc/internal/channelz"
|
||||||
|
"google.golang.org/grpc/internal/grpcsync"
|
||||||
"google.golang.org/grpc/internal/leakcheck"
|
"google.golang.org/grpc/internal/leakcheck"
|
||||||
"google.golang.org/grpc/internal/testutils"
|
"google.golang.org/grpc/internal/testutils"
|
||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
|
|
@ -6994,3 +6995,111 @@ func testLargeTimeout(t *testing.T, e env) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This
|
||||||
|
// test ensures that the connection is re-created after GO_AWAY and not affected by the
|
||||||
|
// subsequent (old) connection closure.
|
||||||
|
func TestGoAwayThenClose(t *testing.T) {
|
||||||
|
defer leakcheck.Check(t)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
lis1, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error while listening. Err: %v", err)
|
||||||
|
}
|
||||||
|
s1 := grpc.NewServer()
|
||||||
|
defer s1.Stop()
|
||||||
|
ts1 := &funcServer{
|
||||||
|
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||||
|
return &testpb.SimpleResponse{}, nil
|
||||||
|
},
|
||||||
|
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
|
||||||
|
// Wait forever.
|
||||||
|
_, err := stream.Recv()
|
||||||
|
if err == nil {
|
||||||
|
t.Error("expected to never receive any message")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
},
|
||||||
|
}
|
||||||
|
testpb.RegisterTestServiceServer(s1, ts1)
|
||||||
|
go s1.Serve(lis1)
|
||||||
|
|
||||||
|
conn2Established := grpcsync.NewEvent()
|
||||||
|
lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error while listening. Err: %v", err)
|
||||||
|
}
|
||||||
|
s2 := grpc.NewServer()
|
||||||
|
defer s2.Stop()
|
||||||
|
conn2Ready := grpcsync.NewEvent()
|
||||||
|
ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||||
|
conn2Ready.Fire()
|
||||||
|
return &testpb.SimpleResponse{}, nil
|
||||||
|
}}
|
||||||
|
testpb.RegisterTestServiceServer(s2, ts2)
|
||||||
|
go s2.Serve(lis2)
|
||||||
|
|
||||||
|
r, rcleanup := manual.GenerateAndRegisterManualResolver()
|
||||||
|
defer rcleanup()
|
||||||
|
r.InitialAddrs([]resolver.Address{
|
||||||
|
{Addr: lis1.Addr().String()},
|
||||||
|
{Addr: lis2.Addr().String()},
|
||||||
|
})
|
||||||
|
cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error creating client: %v", err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
|
||||||
|
client := testpb.NewTestServiceClient(cc)
|
||||||
|
|
||||||
|
// Should go on connection 1. We use a long-lived RPC because it will cause GracefulStop to send GO_AWAY, but the
|
||||||
|
// connection doesn't get closed until the server stops and the client receives.
|
||||||
|
stream, err := client.FullDuplexCall(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send GO_AWAY to connection 1.
|
||||||
|
go s1.GracefulStop()
|
||||||
|
|
||||||
|
// Wait for connection 2 to be established.
|
||||||
|
<-conn2Established.Done()
|
||||||
|
|
||||||
|
// Close connection 1.
|
||||||
|
s1.Stop()
|
||||||
|
|
||||||
|
// Wait for client to close.
|
||||||
|
_, err = stream.Recv()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected the stream to die, but got a successful Recv")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||||
|
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) {
|
||||||
|
lis, err := net.Listen(network, address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return notifyingListener{connEstablished: event, Listener: lis}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type notifyingListener struct {
|
||||||
|
connEstablished *grpcsync.Event
|
||||||
|
net.Listener
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lis notifyingListener) Accept() (net.Conn, error) {
|
||||||
|
defer lis.connEstablished.Fire()
|
||||||
|
return lis.Listener.Accept()
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue