fix some bugs

This commit is contained in:
iamqizhao 2015-03-13 00:16:18 -07:00
parent ebb6f762cb
commit e10de7abd1
7 changed files with 43 additions and 85 deletions

13
call.go
View File

@ -34,14 +34,14 @@
package grpc
import (
"io"
"net"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/transport"
"io"
"log"
"net"
)
// recv receives and parses an RPC response.
@ -127,8 +127,10 @@ func Invoke(ctx context.Context, method string, args, reply proto.Message, cc *C
Last: true,
Delay: false,
}
ts := 0
var lastErr error // record the error that happened
var (
ts int // track the transport sequence number
lastErr error // record the error that happened
)
for {
var (
err error
@ -165,6 +167,7 @@ func Invoke(ctx context.Context, method string, args, reply proto.Message, cc *C
}
t.CloseStream(stream, lastErr)
if lastErr != nil {
log.Println("exit 5: ", lastErr)
return toRPCErr(lastErr)
}
return Errorf(stream.StatusCode(), stream.StatusDesc())

View File

@ -156,7 +156,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
if err != nil {
sleepTime := backoff(retries)
// Fail early before falling into sleep.
if cc.dopts.Timeout > 0 && cc.dopts.Timeout < sleepTime + time.Since(start) {
if cc.dopts.Timeout > 0 && cc.dopts.Timeout < sleepTime+time.Since(start) {
cc.Close()
return ErrClientConnTimeout
}

View File

@ -200,7 +200,7 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
}
}
// toRPCErr converts a transport error into a rpcError if possible.
// toRPCErr converts an error into a rpcError.
func toRPCErr(err error) error {
switch e := err.(type) {
case transport.StreamError:
@ -214,7 +214,7 @@ func toRPCErr(err error) error {
desc: e.Desc,
}
}
return Errorf(codes.Unknown, "grpc: failed to convert %v to rpcErr", err)
return Errorf(codes.Unknown, "%v", err)
}
// convertCode converts a standard Go error into its canonical code. Note that

View File

@ -239,6 +239,7 @@ func TestReconnectTimeout(t *testing.T) {
if err != nil {
t.Fatalf("Failed to dial to the server %q: %v", addr, err)
}
// Close unaccepted connection (i.e., conn).
lis.Close()
tc := testpb.NewTestServiceClient(conn)
waitC := make(chan struct{})
@ -251,9 +252,8 @@ func TestReconnectTimeout(t *testing.T) {
ResponseSize: proto.Int32(int32(respSize)),
Payload: newPayload(testpb.PayloadType_COMPRESSABLE, int32(argSize)),
}
_, err := tc.UnaryCall(context.Background(), req)
if err != grpc.Errorf(codes.Internal, "%v", grpc.ErrClientConnClosing) {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %v", err, grpc.Errorf(codes.Internal, "%v", grpc.ErrClientConnClosing))
if _, err := tc.UnaryCall(context.Background(), req); err == nil {
t.Fatalf("TestService/UnaryCall(_, _) = _, <nil>, want _, non-nil")
}
}()
// Block untill reconnect times out.

View File

@ -209,16 +209,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ContextErr(context.DeadlineExceeded)
}
}
// HPACK encodes various headers.
t.hBuf.Reset()
t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
var authData map[string]string
for _, c := range t.authCreds {
m, err := c.GetRequestMetadata(ctx)
var err error
authData, err = c.GetRequestMetadata(ctx)
select {
case <-ctx.Done():
return nil, ContextErr(ctx.Err())
@ -227,13 +221,23 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if err != nil {
return nil, StreamErrorf(codes.InvalidArgument, "transport: %v", err)
}
for k, v := range m {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})
}
}
// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
// because hpack.Encoder is stateful.
t.hBuf.Reset()
t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
if timeout > 0 {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)})
}
for k, v := range authData {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})
}
if md, ok := metadata.FromContext(ctx); ok {
for k, v := range md {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})

View File

@ -205,6 +205,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
frame, err := t.framer.ReadFrame()
if err != nil {
log.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}

View File

@ -230,8 +230,8 @@ func TestClientSendAndReceive(t *testing.T) {
if recvErr != io.EOF {
t.Fatalf("Error: %v; want <EOF>", recvErr)
}
closeClient(ct, t)
closeServer(server, t)
ct.Close()
server.Close()
}
func TestClientErrorNotify(t *testing.T) {
@ -248,10 +248,10 @@ func TestClientErrorNotify(t *testing.T) {
t.Fatalf("wrong stream id: %d", s.id)
}
// Tear down the server.
go closeServer(server, t)
go server.Close()
// ct.reader should detect the error and activate ct.Error().
<-ct.Error()
closeClient(ct, t)
ct.Close()
}
func performOneRPC(ct ClientTransport) {
@ -284,11 +284,11 @@ func TestClientMix(t *testing.T) {
s, ct := setUp(t, true, 0, math.MaxUint32, false)
go func(s *server) {
time.Sleep(5 * time.Second)
closeServer(s, t)
s.Close()
}(s)
go func(ct ClientTransport) {
<-ct.Error()
closeClient(ct, t)
ct.Close()
}(ct)
for i := 0; i < 1000; i++ {
time.Sleep(10 * time.Millisecond)
@ -299,8 +299,8 @@ func TestClientMix(t *testing.T) {
func TestExceedMaxStreamsLimit(t *testing.T) {
server, ct := setUp(t, true, 0, 1, false)
defer func() {
closeClient(ct, t)
closeServer(server, t)
ct.Close()
server.Close()
}()
callHdr := &CallHdr{
Host: "localhost",
@ -374,8 +374,8 @@ func TestLargeMessage(t *testing.T) {
}()
}
wg.Wait()
closeClient(ct, t)
closeServer(server, t)
ct.Close()
server.Close()
}
func TestLargeMessageSuspension(t *testing.T) {
@ -396,8 +396,8 @@ func TestLargeMessageSuspension(t *testing.T) {
if err == nil || err != expectedErr {
t.Fatalf("Write got %v, want %v", err, expectedErr)
}
closeClient(ct, t)
closeServer(server, t)
ct.Close()
server.Close()
}
func TestStreamContext(t *testing.T) {
@ -408,53 +408,3 @@ func TestStreamContext(t *testing.T) {
t.Fatalf("GetStreamFromContext(%v) = %v, %t, want: %v, true", ctx, *s, ok, expectedStream)
}
}
// closeClient shuts down the ClientTransport and reports any errors to the
// test framework and terminates the current test case.
func closeClient(ct ClientTransport, t *testing.T) {
if err := ct.Close(); err != nil {
t.Fatalf("ct.Close() = %v, want <nil>", err)
}
}
// closeServerWithErr shuts down the testing server, closing the associated
// transports. It returns the first error it encounters, if any.
func closeServerWithErr(s *server) error {
// Keep consistent with s.Close().
s.lis.Close()
s.mu.Lock()
defer s.mu.Unlock()
for c := range s.conns {
if err := c.Close(); err != nil {
return err
}
}
return nil
}
// closeServer shuts down the and testing server, closing the associated
// transport. It reports any errors to the test framework and terminates the
// current test case.
func closeServer(s *server, t *testing.T) {
if err := closeServerWithErr(s); err != nil {
t.Fatalf("server.Close() = %v, want <nil>", err)
}
}
func TestClientServerDuplicatedClose(t *testing.T) {
server, ct := setUp(t, true, 0, math.MaxUint32, false)
if err := ct.Close(); err != nil {
t.Fatalf("ct.Close() = %v, want <nil>", err)
}
if err := ct.Close(); err == nil {
// Duplicated closes should gracefully issue an error.
t.Fatalf("ct.Close() = <nil>, want non-nil")
}
if err := closeServerWithErr(server); err != nil {
t.Fatalf("closeServerWithErr(server) = %v, want <nil>", err)
}
if err := closeServerWithErr(server); err == nil {
// Duplicated closes should gracefully issue an error.
t.Fatalf("closeServerWithErr(server) = <nil>, want non-nil")
}
}