client: propagate connection error causes to RPC statuses (#4311)

This commit is contained in:
apolcyn 2021-04-13 13:06:05 -07:00 committed by GitHub
parent 7a6ab59115
commit c229922995
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 61 deletions

View File

@ -1197,7 +1197,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
return
}
ac.curAddr = addr
@ -1329,7 +1329,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
select {
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
newTr.Close()
newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:

View File

@ -347,12 +347,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err
}
if n != len(clientPreface) {
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err
}
var ss []http2.Setting
@ -370,14 +372,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err
}
}
@ -845,12 +849,12 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error {
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only Close once.
if t.state == closing {
t.mu.Unlock()
return nil
return
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
@ -866,13 +870,13 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
// Notify all active streams.
for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, err.Error()), nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
@ -880,7 +884,6 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
return err
}
// GracefulClose sets the state to draining, which prevents new streams from
@ -899,7 +902,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(ErrConnClosing)
return
}
t.controlBuf.put(&incomingGoAway{})
@ -1147,7 +1150,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 != 1 {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
@ -1165,7 +1168,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
}
default:
@ -1195,7 +1198,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
}
}
@ -1313,7 +1316,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close() // this kicks off resetTransport, so must be last before return
err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
@ -1322,7 +1326,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close() // this kicks off resetTransport, so must be last before return
// this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
return
}
t.onPrefaceReceipt()
@ -1358,7 +1363,7 @@ func (t *http2Client) reader() {
continue
} else {
// Transport error.
t.Close()
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return
}
}
@ -1417,7 +1422,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return
}
t.mu.Lock()

View File

@ -24,6 +24,7 @@ package transport
import (
"context"
"fmt"
"io"
"net"
"testing"
@ -47,7 +48,7 @@ func (s) TestMaxConnectionIdle(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -86,7 +87,7 @@ func (s) TestMaxConnectionIdleBusyClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -122,7 +123,7 @@ func (s) TestMaxConnectionAge(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -169,7 +170,7 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -228,7 +229,7 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -257,7 +258,7 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
PermitWithoutStream: true,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))
conn, ok := <-connCh
if !ok {
@ -288,7 +289,7 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))
conn, ok := <-connCh
if !ok {
@ -317,7 +318,7 @@ func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
Timeout: 1 * time.Second,
}}, connCh)
defer cancel()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))
conn, ok := <-connCh
if !ok {
@ -352,7 +353,7 @@ func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
PermitWithoutStream: true,
}})
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -391,7 +392,7 @@ func (s) TestKeepaliveClientFrequency(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -436,7 +437,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -480,7 +481,7 @@ func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -530,7 +531,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -564,7 +565,7 @@ func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -604,7 +605,7 @@ func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()
@ -658,7 +659,7 @@ func (s) TestTCPUserTimeout(t *testing.T) {
},
)
defer func() {
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
server.stop()
cancel()
}()

View File

@ -622,7 +622,7 @@ type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this
// is called only once.
Close() error
Close(err error)
// GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are

View File

@ -481,7 +481,7 @@ func (s) TestInflightStreamClosing(t *testing.T) {
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@ -550,7 +550,7 @@ func (s) TestClientSendAndReceive(t *testing.T) {
if recvErr != io.EOF {
t.Fatalf("Error: %v; want <EOF>", recvErr)
}
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
server.stop()
}
@ -560,7 +560,7 @@ func (s) TestClientErrorNotify(t *testing.T) {
go server.stop()
// ct.reader should detect the error and activate ct.Error().
<-ct.Error()
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
}
func performOneRPC(ct ClientTransport) {
@ -597,7 +597,7 @@ func (s) TestClientMix(t *testing.T) {
}(s)
go func(ct ClientTransport) {
<-ct.Error()
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
}(ct)
for i := 0; i < 1000; i++ {
time.Sleep(10 * time.Millisecond)
@ -636,7 +636,7 @@ func (s) TestLargeMessage(t *testing.T) {
}()
}
wg.Wait()
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
server.stop()
}
@ -653,7 +653,7 @@ func (s) TestLargeMessageWithDelayRead(t *testing.T) {
server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co)
defer cancel()
defer server.stop()
defer ct.Close()
defer ct.Close(fmt.Errorf("closed manually by test"))
server.mu.Lock()
ready := server.ready
server.mu.Unlock()
@ -831,7 +831,7 @@ func (s) TestLargeMessageSuspension(t *testing.T) {
if _, err := s.Read(make([]byte, 8)); err.Error() != expectedErr.Error() {
t.Fatalf("Read got %v of type %T, want %v", err, err, expectedErr)
}
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
server.stop()
}
@ -841,7 +841,7 @@ func (s) TestMaxStreams(t *testing.T) {
}
server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer ct.Close()
defer ct.Close(fmt.Errorf("closed manually by test"))
defer server.stop()
callHdr := &CallHdr{
Host: "localhost",
@ -901,7 +901,7 @@ func (s) TestMaxStreams(t *testing.T) {
// Close the first stream created so that the new stream can finally be created.
ct.CloseStream(s, nil)
<-done
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
<-ct.writerDone
if ct.maxConcurrentStreams != 1 {
t.Fatalf("ct.maxConcurrentStreams: %d, want 1", ct.maxConcurrentStreams)
@ -960,7 +960,7 @@ func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) {
sc.mu.Unlock()
break
}
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
select {
case <-ss.Context().Done():
if ss.Context().Err() != context.Canceled {
@ -980,7 +980,7 @@ func (s) TestClientConnDecoupledFromApplicationRead(t *testing.T) {
server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions)
defer cancel()
defer server.stop()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))
waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
@ -1069,7 +1069,7 @@ func (s) TestServerConnDecoupledFromApplicationRead(t *testing.T) {
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer cancel()
defer server.stop()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))
waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()
@ -1302,7 +1302,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
defer ct.Close()
defer ct.Close(fmt.Errorf("closed manually by test"))
str, err := ct.NewStream(connectCtx, &CallHdr{})
if err != nil {
t.Fatalf("Error while creating stream: %v", err)
@ -1345,7 +1345,7 @@ func (s) TestEncodingRequiredStatus(t *testing.T) {
if !testutils.StatusErrEqual(s.Status().Err(), encodingTestStatus.Err()) {
t.Fatalf("stream with status %v, want %v", s.Status(), encodingTestStatus)
}
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
server.stop()
}
@ -1367,7 +1367,7 @@ func (s) TestInvalidHeaderField(t *testing.T) {
if se, ok := status.FromError(err); !ok || se.Code() != codes.Internal || !strings.Contains(err.Error(), expectedInvalidHeaderField) {
t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.Internal, expectedInvalidHeaderField)
}
ct.Close()
ct.Close(fmt.Errorf("closed manually by test"))
server.stop()
}
@ -1375,7 +1375,7 @@ func (s) TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) {
server, ct, cancel := setUp(t, 0, math.MaxUint32, invalidHeaderField)
defer cancel()
defer server.stop()
defer ct.Close()
defer ct.Close(fmt.Errorf("closed manually by test"))
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
s, err := ct.NewStream(ctx, &CallHdr{Host: "localhost", Method: "foo"})
@ -1481,7 +1481,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co)
defer cancel()
defer server.stop()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))
waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()
@ -1563,7 +1563,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
}
// Close down both server and client so that their internals can be read without data
// races.
client.Close()
client.Close(fmt.Errorf("closed manually by test"))
st.Close()
<-st.readerDone
<-st.writerDone
@ -1762,7 +1762,7 @@ func runPingPongTest(t *testing.T, msgSize int) {
server, client, cancel := setUp(t, 0, 0, pingpong)
defer cancel()
defer server.stop()
defer client.Close()
defer client.Close(fmt.Errorf("closed manually by test"))
waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()
@ -1850,7 +1850,7 @@ func (s) TestHeaderTblSize(t *testing.T) {
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
defer ct.Close()
defer ct.Close(fmt.Errorf("closed manually by test"))
defer server.stop()
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
@ -1969,7 +1969,7 @@ func (s) TestClientHandshakeInfo(t *testing.T) {
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
defer tr.Close()
defer tr.Close(fmt.Errorf("closed manually by test"))
wantAttr := attributes.New(testAttrKey, testAttrVal)
if gotAttr := creds.attr; !cmp.Equal(gotAttr, wantAttr, cmp.AllowUnexported(attributes.Attributes{})) {

View File

@ -1333,6 +1333,53 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
awaitNewConnLogOutput()
}
func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) {
rpcStartedOnServer := make(chan struct{})
rpcDoneOnClient := make(chan struct{})
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
close(rpcStartedOnServer)
<-rpcDoneOnClient
return status.Error(codes.Internal, "arbitrary status")
},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// The precise behavior of this test is subject to raceyness around the timing of when TCP packets
// are sent from client to server, and when we tell the server to stop, so we need to account for both
// of these possible error messages:
// 1) If the call to ss.S.Stop() causes the server's sockets to close while there's still in-fight
// data from the client on the TCP connection, then the kernel can send an RST back to the client (also
// see https://stackoverflow.com/questions/33053507/econnreset-in-send-linux-c). Note that while this
// condition is expected to be rare due to the rpcStartedOnServer synchronization, in theory it should
// be possible, e.g. if the client sends a BDP ping at the right time.
// 2) If, for example, the call to ss.S.Stop() happens after the RPC headers have been received at the
// server, then the TCP connection can shutdown gracefully when the server's socket closes.
const possibleConnResetMsg = "connection reset by peer"
const possibleEOFMsg = "error reading from server: EOF"
// Start an RPC. Then, while the RPC is still being accepted or handled at the server, abruptly
// stop the server, killing the connection. The RPC error message should include details about the specific
// connection error that was encountered.
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
}
// Block until the RPC has been started on the server. This ensures that the ClientConn will find a healthy
// connection for the RPC to go out on initially, and that the TCP connection will shut down strictly after
// the RPC has been started on it.
<-rpcStartedOnServer
ss.S.Stop()
if _, err := stream.Recv(); err == nil || (!strings.Contains(err.Error(), possibleConnResetMsg) && !strings.Contains(err.Error(), possibleEOFMsg)) {
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q OR %q", stream, err, possibleConnResetMsg, possibleEOFMsg)
}
close(rpcDoneOnClient)
}
func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
for _, e := range listTestEnv() {
if e.name == "handler-tls" {