From e10de7abd1a58d4c4e1103bbf3235b523be19979 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 13 Mar 2015 00:16:18 -0700 Subject: [PATCH] fix some bugs --- call.go | 13 ++++--- clientconn.go | 2 +- rpc_util.go | 4 +- test/end2end_test.go | 6 +-- transport/http2_client.go | 28 ++++++++------ transport/http2_server.go | 1 + transport/transport_test.go | 74 ++++++------------------------------- 7 files changed, 43 insertions(+), 85 deletions(-) diff --git a/call.go b/call.go index 10bd8a545..5b03a3f54 100644 --- a/call.go +++ b/call.go @@ -34,14 +34,14 @@ package grpc import ( - "io" - "net" - "github.com/golang/protobuf/proto" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/transport" + "io" + "log" + "net" ) // recv receives and parses an RPC response. @@ -127,8 +127,10 @@ func Invoke(ctx context.Context, method string, args, reply proto.Message, cc *C Last: true, Delay: false, } - ts := 0 - var lastErr error // record the error that happened + var ( + ts int // track the transport sequence number + lastErr error // record the error that happened + ) for { var ( err error @@ -165,6 +167,7 @@ func Invoke(ctx context.Context, method string, args, reply proto.Message, cc *C } t.CloseStream(stream, lastErr) if lastErr != nil { + log.Println("exit 5: ", lastErr) return toRPCErr(lastErr) } return Errorf(stream.StatusCode(), stream.StatusDesc()) diff --git a/clientconn.go b/clientconn.go index bfb0ff4b3..8ac660a40 100644 --- a/clientconn.go +++ b/clientconn.go @@ -156,7 +156,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { if err != nil { sleepTime := backoff(retries) // Fail early before falling into sleep. - if cc.dopts.Timeout > 0 && cc.dopts.Timeout < sleepTime + time.Since(start) { + if cc.dopts.Timeout > 0 && cc.dopts.Timeout < sleepTime+time.Since(start) { cc.Close() return ErrClientConnTimeout } diff --git a/rpc_util.go b/rpc_util.go index 5874be213..0107d8180 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -200,7 +200,7 @@ func Errorf(c codes.Code, format string, a ...interface{}) error { } } -// toRPCErr converts a transport error into a rpcError if possible. +// toRPCErr converts an error into a rpcError. func toRPCErr(err error) error { switch e := err.(type) { case transport.StreamError: @@ -214,7 +214,7 @@ func toRPCErr(err error) error { desc: e.Desc, } } - return Errorf(codes.Unknown, "grpc: failed to convert %v to rpcErr", err) + return Errorf(codes.Unknown, "%v", err) } // convertCode converts a standard Go error into its canonical code. Note that diff --git a/test/end2end_test.go b/test/end2end_test.go index e516bdf1a..a678b5360 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -239,6 +239,7 @@ func TestReconnectTimeout(t *testing.T) { if err != nil { t.Fatalf("Failed to dial to the server %q: %v", addr, err) } + // Close unaccepted connection (i.e., conn). lis.Close() tc := testpb.NewTestServiceClient(conn) waitC := make(chan struct{}) @@ -251,9 +252,8 @@ func TestReconnectTimeout(t *testing.T) { ResponseSize: proto.Int32(int32(respSize)), Payload: newPayload(testpb.PayloadType_COMPRESSABLE, int32(argSize)), } - _, err := tc.UnaryCall(context.Background(), req) - if err != grpc.Errorf(codes.Internal, "%v", grpc.ErrClientConnClosing) { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %v", err, grpc.Errorf(codes.Internal, "%v", grpc.ErrClientConnClosing)) + if _, err := tc.UnaryCall(context.Background(), req); err == nil { + t.Fatalf("TestService/UnaryCall(_, _) = _, , want _, non-nil") } }() // Block untill reconnect times out. diff --git a/transport/http2_client.go b/transport/http2_client.go index 54008e7cf..ae08af260 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -209,16 +209,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, ContextErr(context.DeadlineExceeded) } } - // HPACK encodes various headers. - t.hBuf.Reset() - t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"}) - t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme}) - t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method}) - t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) - t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) - t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"}) + var authData map[string]string for _, c := range t.authCreds { - m, err := c.GetRequestMetadata(ctx) + var err error + authData, err = c.GetRequestMetadata(ctx) select { case <-ctx.Done(): return nil, ContextErr(ctx.Err()) @@ -227,13 +221,23 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if err != nil { return nil, StreamErrorf(codes.InvalidArgument, "transport: %v", err) } - for k, v := range m { - t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v}) - } } + // HPACK encodes various headers. Note that once WriteField(...) is + // called, the corresponding headers/continuation frame has to be sent + // because hpack.Encoder is stateful. + t.hBuf.Reset() + t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"}) + t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme}) + t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method}) + t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) + t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) + t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"}) if timeout > 0 { t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)}) } + for k, v := range authData { + t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v}) + } if md, ok := metadata.FromContext(ctx); ok { for k, v := range md { t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v}) diff --git a/transport/http2_server.go b/transport/http2_server.go index 65bfc8b1e..d9f833d78 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -205,6 +205,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { frame, err := t.framer.ReadFrame() if err != nil { + log.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err) t.Close() return } diff --git a/transport/transport_test.go b/transport/transport_test.go index bfd2c54cd..1a284072b 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -230,8 +230,8 @@ func TestClientSendAndReceive(t *testing.T) { if recvErr != io.EOF { t.Fatalf("Error: %v; want ", recvErr) } - closeClient(ct, t) - closeServer(server, t) + ct.Close() + server.Close() } func TestClientErrorNotify(t *testing.T) { @@ -248,10 +248,10 @@ func TestClientErrorNotify(t *testing.T) { t.Fatalf("wrong stream id: %d", s.id) } // Tear down the server. - go closeServer(server, t) + go server.Close() // ct.reader should detect the error and activate ct.Error(). <-ct.Error() - closeClient(ct, t) + ct.Close() } func performOneRPC(ct ClientTransport) { @@ -284,11 +284,11 @@ func TestClientMix(t *testing.T) { s, ct := setUp(t, true, 0, math.MaxUint32, false) go func(s *server) { time.Sleep(5 * time.Second) - closeServer(s, t) + s.Close() }(s) go func(ct ClientTransport) { <-ct.Error() - closeClient(ct, t) + ct.Close() }(ct) for i := 0; i < 1000; i++ { time.Sleep(10 * time.Millisecond) @@ -299,8 +299,8 @@ func TestClientMix(t *testing.T) { func TestExceedMaxStreamsLimit(t *testing.T) { server, ct := setUp(t, true, 0, 1, false) defer func() { - closeClient(ct, t) - closeServer(server, t) + ct.Close() + server.Close() }() callHdr := &CallHdr{ Host: "localhost", @@ -374,8 +374,8 @@ func TestLargeMessage(t *testing.T) { }() } wg.Wait() - closeClient(ct, t) - closeServer(server, t) + ct.Close() + server.Close() } func TestLargeMessageSuspension(t *testing.T) { @@ -396,8 +396,8 @@ func TestLargeMessageSuspension(t *testing.T) { if err == nil || err != expectedErr { t.Fatalf("Write got %v, want %v", err, expectedErr) } - closeClient(ct, t) - closeServer(server, t) + ct.Close() + server.Close() } func TestStreamContext(t *testing.T) { @@ -408,53 +408,3 @@ func TestStreamContext(t *testing.T) { t.Fatalf("GetStreamFromContext(%v) = %v, %t, want: %v, true", ctx, *s, ok, expectedStream) } } - -// closeClient shuts down the ClientTransport and reports any errors to the -// test framework and terminates the current test case. -func closeClient(ct ClientTransport, t *testing.T) { - if err := ct.Close(); err != nil { - t.Fatalf("ct.Close() = %v, want ", err) - } -} - -// closeServerWithErr shuts down the testing server, closing the associated -// transports. It returns the first error it encounters, if any. -func closeServerWithErr(s *server) error { - // Keep consistent with s.Close(). - s.lis.Close() - s.mu.Lock() - defer s.mu.Unlock() - for c := range s.conns { - if err := c.Close(); err != nil { - return err - } - } - return nil -} - -// closeServer shuts down the and testing server, closing the associated -// transport. It reports any errors to the test framework and terminates the -// current test case. -func closeServer(s *server, t *testing.T) { - if err := closeServerWithErr(s); err != nil { - t.Fatalf("server.Close() = %v, want ", err) - } -} - -func TestClientServerDuplicatedClose(t *testing.T) { - server, ct := setUp(t, true, 0, math.MaxUint32, false) - if err := ct.Close(); err != nil { - t.Fatalf("ct.Close() = %v, want ", err) - } - if err := ct.Close(); err == nil { - // Duplicated closes should gracefully issue an error. - t.Fatalf("ct.Close() = , want non-nil") - } - if err := closeServerWithErr(server); err != nil { - t.Fatalf("closeServerWithErr(server) = %v, want ", err) - } - if err := closeServerWithErr(server); err == nil { - // Duplicated closes should gracefully issue an error. - t.Fatalf("closeServerWithErr(server) = , want non-nil") - } -}