From b2d4d5dbae052c23959f5adb63eae94e6bd6a893 Mon Sep 17 00:00:00 2001 From: Arvind Bright Date: Mon, 19 Dec 2022 12:57:49 -0600 Subject: [PATCH] test: fix raceyness check to deflake test http server (#5866) Fixes https://github.com/grpc/grpc-go/issues/4990 --- test/end2end_test.go | 104 +++++++++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 33 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 61907e4ca..ae536520f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -888,6 +888,32 @@ type lazyConn struct { beLazy int32 } +// possible conn closed errors. +const possibleConnResetMsg = "connection reset by peer" +const possibleEOFMsg = "error reading from server: EOF" + +// isConnClosedErr checks the error msg for possible conn closed messages. There +// is a raceyness in the timing of when TCP packets are sent from client to +// server, and when we tell the server to stop, so we need to check for both of +// these possible error messages: +// 1. If the call to ss.S.Stop() causes the server's sockets to close while +// there's still in-fight data from the client on the TCP connection, then +// the kernel can send an RST back to the client (also see +// https://stackoverflow.com/questions/33053507/econnreset-in-send-linux-c). +// Note that while this condition is expected to be rare due to the +// test httpServer start synchronization, in theory it should be possible, +// e.g. if the client sends a BDP ping at the right time. +// 2. If, for example, the call to ss.S.Stop() happens after the RPC headers +// have been received at the server, then the TCP connection can shutdown +// gracefully when the server's socket closes. +// 3. If there is an actual io.EOF received because the client stopped the stream. +func isConnClosedErr(err error) bool { + errContainsConnResetMsg := strings.Contains(err.Error(), possibleConnResetMsg) + errContainsEOFMsg := strings.Contains(err.Error(), possibleEOFMsg) + + return errContainsConnResetMsg || errContainsEOFMsg || err == io.EOF +} + func (l *lazyConn) Write(b []byte) (int, error) { if atomic.LoadInt32(&(l.beLazy)) == 1 { time.Sleep(time.Second) @@ -1013,18 +1039,7 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // The precise behavior of this test is subject to raceyness around the timing of when TCP packets - // are sent from client to server, and when we tell the server to stop, so we need to account for both - // of these possible error messages: - // 1) If the call to ss.S.Stop() causes the server's sockets to close while there's still in-fight - // data from the client on the TCP connection, then the kernel can send an RST back to the client (also - // see https://stackoverflow.com/questions/33053507/econnreset-in-send-linux-c). Note that while this - // condition is expected to be rare due to the rpcStartedOnServer synchronization, in theory it should - // be possible, e.g. if the client sends a BDP ping at the right time. - // 2) If, for example, the call to ss.S.Stop() happens after the RPC headers have been received at the - // server, then the TCP connection can shutdown gracefully when the server's socket closes. - const possibleConnResetMsg = "connection reset by peer" - const possibleEOFMsg = "error reading from server: EOF" + // Start an RPC. Then, while the RPC is still being accepted or handled at the server, abruptly // stop the server, killing the connection. The RPC error message should include details about the specific // connection error that was encountered. @@ -1037,7 +1052,10 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) { // the RPC has been started on it. <-rpcStartedOnServer ss.S.Stop() - if _, err := stream.Recv(); err == nil || (!strings.Contains(err.Error(), possibleConnResetMsg) && !strings.Contains(err.Error(), possibleEOFMsg)) { + // The precise behavior of this test is subject to raceyness around the timing + // of when TCP packets are sent from client to server, and when we tell the + // server to stop, so we need to account for both possible error messages. + if _, err := stream.Recv(); err == io.EOF || !isConnClosedErr(err) { t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q OR %q", stream, err, possibleConnResetMsg, possibleEOFMsg) } close(rpcDoneOnClient) @@ -6739,31 +6757,37 @@ func (s) TestRPCWaitsForResolver(t *testing.T) { func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) { // Non-gRPC content-type fallback path. for httpCode := range transport.HTTPStatusConvTab { - doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ + if err := doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ ":status", fmt.Sprintf("%d", httpCode), "content-type", "text/html", // non-gRPC content type to switch to HTTP mode. "grpc-status", "1", // Make up a gRPC status error "grpc-status-details-bin", "???", // Make up a gRPC field parsing error - }) + }); err != nil { + t.Error(err) + } } // Missing content-type fallback path. for httpCode := range transport.HTTPStatusConvTab { - doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ + if err := doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ ":status", fmt.Sprintf("%d", httpCode), // Omitting content type to switch to HTTP mode. "grpc-status", "1", // Make up a gRPC status error "grpc-status-details-bin", "???", // Make up a gRPC field parsing error - }) + }); err != nil { + t.Error(err) + } } // Malformed HTTP status when fallback. - doHTTPHeaderTest(t, codes.Internal, []string{ + if err := doHTTPHeaderTest(t, codes.Internal, []string{ ":status", "abc", // Omitting content type to switch to HTTP mode. "grpc-status", "1", // Make up a gRPC status error "grpc-status-details-bin", "???", // Make up a gRPC field parsing error - }) + }); err != nil { + t.Error(err) + } } // Testing erroneous ResponseHeader or Trailers-only (delivered in the first HEADERS frame). @@ -6809,18 +6833,22 @@ func (s) TestHTTPHeaderFrameErrorHandlingInitialHeader(t *testing.T) { errCode: codes.Unavailable, }, } { - doHTTPHeaderTest(t, test.errCode, test.header) + if err := doHTTPHeaderTest(t, test.errCode, test.header); err != nil { + t.Error(err) + } } } // Testing non-Trailers-only Trailers (delivered in second HEADERS frame) func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { - for _, test := range []struct { + tests := []struct { + name string responseHeader []string trailer []string errCode codes.Code }{ { + name: "trailer missing grpc-status", responseHeader: []string{ ":status", "200", "content-type", "application/grpc", @@ -6832,6 +6860,7 @@ func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { errCode: codes.Unavailable, }, { + name: "malformed grpc-status-details-bin field with status 404", responseHeader: []string{ ":status", "404", "content-type", "application/grpc", @@ -6844,6 +6873,7 @@ func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { errCode: codes.Unimplemented, }, { + name: "malformed grpc-status-details-bin field with status 200", responseHeader: []string{ ":status", "200", "content-type", "application/grpc", @@ -6855,8 +6885,14 @@ func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { }, errCode: codes.Internal, }, - } { - doHTTPHeaderTest(t, test.errCode, test.responseHeader, test.trailer) + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if err := doHTTPHeaderTest(t, test.errCode, test.responseHeader, test.trailer); err != nil { + t.Error(err) + } + }) + } } @@ -6865,7 +6901,9 @@ func (s) TestHTTPHeaderFrameErrorHandlingMoreThanTwoHeaders(t *testing.T) { ":status", "200", "content-type", "application/grpc", } - doHTTPHeaderTest(t, codes.Internal, header, header, header) + if err := doHTTPHeaderTest(t, codes.Internal, header, header, header); err != nil { + t.Fatal(err) + } } type httpServerResponse struct { @@ -6930,14 +6968,14 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) { } writer.Flush() // necessary since client is expecting preface before declaring connection fully setup. var sid uint32 - // Loop until conn is closed and framer returns io.EOF + // Loop until framer returns possible conn closed errors. for requestNum := 0; ; requestNum = (requestNum + 1) % len(s.responses) { // Read frames until a header is received. for { frame, err := framer.ReadFrame() if err != nil { - if err != io.EOF { - t.Errorf("Error at server-side while reading frame. Err: %v", err) + if !isConnClosedErr(err) { + t.Errorf("Error at server-side while reading frame. got: %q, want: rpc error containing substring %q OR %q", err, possibleConnResetMsg, possibleEOFMsg) } return } @@ -6994,11 +7032,10 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) { }() } -func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string) { - t.Helper() +func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string) error { lis, err := net.Listen("tcp", "localhost:0") if err != nil { - t.Fatalf("Failed to listen. Err: %v", err) + return fmt.Errorf("listening on %q: %v", "localhost:0", err) } defer lis.Close() server := &httpServer{ @@ -7007,7 +7044,7 @@ func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string server.start(t, lis) cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("failed to dial due to err: %v", err) + return fmt.Errorf("dial(%q): %v", lis.Addr().String(), err) } defer cc.Close() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -7015,11 +7052,12 @@ func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string client := testpb.NewTestServiceClient(cc) stream, err := client.FullDuplexCall(ctx) if err != nil { - t.Fatalf("error creating stream due to err: %v", err) + return fmt.Errorf("creating FullDuplex stream: %v", err) } if _, err := stream.Recv(); err == nil || status.Code(err) != errCode { - t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode) + return fmt.Errorf("stream.Recv() = %v, want error code: %v", err, errCode) } + return nil } func (s) TestClientCancellationPropagatesUnary(t *testing.T) {