mirror of https://github.com/grpc/grpc-go.git
Merge pull request #874 from menghanl/avoid_streamerror
Fix server side errors
This commit is contained in:
commit
8d57dd3404
2
call.go
2
call.go
|
|
@ -96,7 +96,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
|
|||
}
|
||||
outBuf, err := encode(codec, args, compressor, cbuf)
|
||||
if err != nil {
|
||||
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
|
||||
return nil, Errorf(codes.Internal, "grpc: %v", err)
|
||||
}
|
||||
err = t.Write(stream, outBuf, opts)
|
||||
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
|
||||
|
|
|
|||
|
|
@ -303,10 +303,10 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
|
|||
case compressionNone:
|
||||
case compressionMade:
|
||||
if dc == nil || recvCompress != dc.Type() {
|
||||
return transport.StreamErrorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
|
||||
return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
|
||||
}
|
||||
default:
|
||||
return transport.StreamErrorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
|
||||
return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -152,7 +152,7 @@ func TestToRPCErr(t *testing.T) {
|
|||
// outputs
|
||||
errOut *rpcError
|
||||
}{
|
||||
{transport.StreamErrorf(codes.Unknown, ""), Errorf(codes.Unknown, "").(*rpcError)},
|
||||
{transport.StreamError{codes.Unknown, ""}, Errorf(codes.Unknown, "").(*rpcError)},
|
||||
{transport.ErrConnClosing, Errorf(codes.Internal, transport.ErrConnClosing.Desc).(*rpcError)},
|
||||
} {
|
||||
err := toRPCErr(test.errIn)
|
||||
|
|
@ -173,8 +173,8 @@ func TestContextErr(t *testing.T) {
|
|||
// outputs
|
||||
errOut transport.StreamError
|
||||
}{
|
||||
{context.DeadlineExceeded, transport.StreamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)},
|
||||
{context.Canceled, transport.StreamErrorf(codes.Canceled, "%v", context.Canceled)},
|
||||
{context.DeadlineExceeded, transport.StreamError{codes.DeadlineExceeded, context.DeadlineExceeded.Error()}},
|
||||
{context.Canceled, transport.StreamError{codes.Canceled, context.Canceled.Error()}},
|
||||
} {
|
||||
err := transport.ContextErr(test.errIn)
|
||||
if err != test.errOut {
|
||||
|
|
|
|||
|
|
@ -547,7 +547,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||
return err
|
||||
}
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
err = transport.StreamError{Code: codes.Internal, Desc: "io.ErrUnexpectedEOF"}
|
||||
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
|
||||
}
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
|
|
@ -569,8 +569,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||
|
||||
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
|
||||
switch err := err.(type) {
|
||||
case transport.StreamError:
|
||||
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
|
||||
case *rpcError:
|
||||
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
}
|
||||
default:
|
||||
|
|
|
|||
20
stream.go
20
stream.go
|
|
@ -296,7 +296,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||
}
|
||||
}()
|
||||
if err != nil {
|
||||
return transport.StreamErrorf(codes.Internal, "grpc: %v", err)
|
||||
return Errorf(codes.Internal, "grpc: %v", err)
|
||||
}
|
||||
return cs.t.Write(cs.s, out, &transport.Options{Last: false})
|
||||
}
|
||||
|
|
@ -468,10 +468,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
|||
}
|
||||
}()
|
||||
if err != nil {
|
||||
err = transport.StreamErrorf(codes.Internal, "grpc: %v", err)
|
||||
err = Errorf(codes.Internal, "grpc: %v", err)
|
||||
return err
|
||||
}
|
||||
return ss.t.Write(ss.s, out, &transport.Options{Last: false})
|
||||
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
|
||||
return toRPCErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
||||
|
|
@ -489,5 +492,14 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
|||
ss.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
return recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize)
|
||||
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
|
||||
if err == io.EOF {
|
||||
return err
|
||||
}
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
|
||||
}
|
||||
return toRPCErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,7 +66,6 @@ import (
|
|||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -910,7 +909,7 @@ func testHealthCheckOnFailure(t *testing.T, e env) {
|
|||
cc := te.clientConn()
|
||||
wantErr := grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded")
|
||||
if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1.Health"); !equalErrors(err, wantErr) {
|
||||
t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded)
|
||||
t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.DeadlineExceeded)
|
||||
}
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
|
@ -960,7 +959,7 @@ func testHealthCheckServingStatus(t *testing.T, e env) {
|
|||
}
|
||||
wantErr := grpc.Errorf(codes.NotFound, "unknown service")
|
||||
if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); !equalErrors(err, wantErr) {
|
||||
t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %d", err, codes.NotFound)
|
||||
t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.NotFound)
|
||||
}
|
||||
hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_SERVING)
|
||||
out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
|
||||
|
|
@ -1213,7 +1212,7 @@ func testMalformedHTTP2Metadata(t *testing.T, e env) {
|
|||
}
|
||||
ctx := metadata.NewContext(context.Background(), malformedHTTP2Metadata)
|
||||
if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Internal {
|
||||
t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %q", ctx, err, codes.Internal)
|
||||
t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1415,7 +1414,7 @@ func testCancelNoIO(t *testing.T, e env) {
|
|||
if grpc.Code(err) == codes.DeadlineExceeded {
|
||||
break
|
||||
}
|
||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %d", tc, err, codes.DeadlineExceeded)
|
||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
|
||||
}
|
||||
// If there are any RPCs in flight before the client receives
|
||||
// the max streams setting, let them be expired.
|
||||
|
|
@ -1464,7 +1463,7 @@ func testNoService(t *testing.T, e env) {
|
|||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
}
|
||||
if _, err := stream.Recv(); grpc.Code(err) != codes.Unimplemented {
|
||||
t.Fatalf("stream.Recv() = _, %v, want _, error code %d", err, codes.Unimplemented)
|
||||
t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1688,6 +1687,79 @@ func testFailedServerStreaming(t *testing.T, e env) {
|
|||
}
|
||||
}
|
||||
|
||||
// checkTimeoutErrorServer is a gRPC server checks context timeout error in FullDuplexCall().
|
||||
// It is only used in TestStreamingRPCTimeoutServerError.
|
||||
type checkTimeoutErrorServer struct {
|
||||
t *testing.T
|
||||
testpb.TestServiceServer
|
||||
}
|
||||
|
||||
func (s checkTimeoutErrorServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
// read done.
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
if grpc.Code(err) != codes.DeadlineExceeded {
|
||||
s.t.Fatalf("stream.Recv(_) = _, %v, want error code %s", err, codes.DeadlineExceeded)
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err := stream.Send(&testpb.StreamingOutputCallResponse{
|
||||
Payload: &testpb.Payload{
|
||||
Body: []byte{'0'},
|
||||
},
|
||||
}); err != nil {
|
||||
if grpc.Code(err) != codes.DeadlineExceeded {
|
||||
s.t.Fatalf("stream.Send(_) = %v, want error code %s", err, codes.DeadlineExceeded)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamingRPCTimeoutServerError(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
for _, e := range listTestEnv() {
|
||||
testStreamingRPCTimeoutServerError(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
// testStreamingRPCTimeoutServerError tests the server side behavior.
|
||||
// When context timeout happens on client side, server should get deadline exceeded error.
|
||||
func testStreamingRPCTimeoutServerError(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.startServer(checkTimeoutErrorServer{t: t})
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
|
||||
req := &testpb.StreamingOutputCallRequest{}
|
||||
duration := 100 * time.Millisecond
|
||||
ctx, _ := context.WithTimeout(context.Background(), duration)
|
||||
stream, err := tc.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
return
|
||||
}
|
||||
for {
|
||||
err := stream.Send(req)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
_, err = stream.Recv()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for context timeout on server before closing connection.
|
||||
time.Sleep(duration)
|
||||
te.tearDown()
|
||||
}
|
||||
|
||||
// concurrentSendServer is a TestServiceServer whose
|
||||
// StreamingOutputCall makes ten serial Send calls, sending payloads
|
||||
// "0".."9", inclusive. TestServerStreamingConcurrent verifies they
|
||||
|
|
@ -1847,7 +1919,7 @@ func testClientStreamingError(t *testing.T, e env) {
|
|||
continue
|
||||
}
|
||||
if _, err := stream.CloseAndRecv(); grpc.Code(err) != codes.NotFound {
|
||||
t.Fatalf("%v.CloseAndRecv() = %v, want error %d", stream, err, codes.NotFound)
|
||||
t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
|
@ -1890,7 +1962,7 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
|
|||
if grpc.Code(err) == codes.DeadlineExceeded {
|
||||
break
|
||||
}
|
||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %d", tc, err, codes.DeadlineExceeded)
|
||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1930,7 +2002,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) {
|
|||
if grpc.Code(err) == codes.DeadlineExceeded {
|
||||
break
|
||||
}
|
||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %d", tc, err, codes.DeadlineExceeded)
|
||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
|
||||
}
|
||||
cancel()
|
||||
|
||||
|
|
@ -1976,7 +2048,7 @@ func testCompressServerHasNoSupport(t *testing.T, e env) {
|
|||
Payload: payload,
|
||||
}
|
||||
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Unimplemented {
|
||||
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %d", err, codes.Unimplemented)
|
||||
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented)
|
||||
}
|
||||
// Streaming RPC
|
||||
stream, err := tc.FullDuplexCall(context.Background())
|
||||
|
|
@ -2001,7 +2073,7 @@ func testCompressServerHasNoSupport(t *testing.T, e env) {
|
|||
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
|
||||
}
|
||||
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Unimplemented {
|
||||
t.Fatalf("%v.Recv() = %v, want error code %d", stream, err, codes.Unimplemented)
|
||||
t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2084,7 +2156,7 @@ func testUnaryServerInterceptor(t *testing.T, e env) {
|
|||
|
||||
tc := testpb.NewTestServiceClient(te.clientConn())
|
||||
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.PermissionDenied {
|
||||
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %d", tc, err, codes.PermissionDenied)
|
||||
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2133,7 +2205,7 @@ func testStreamServerInterceptor(t *testing.T, e env) {
|
|||
t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
if _, err := s1.Recv(); grpc.Code(err) != codes.PermissionDenied {
|
||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %d", tc, err, codes.PermissionDenied)
|
||||
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
|
||||
}
|
||||
s2, err := tc.FullDuplexCall(context.Background())
|
||||
if err != nil {
|
||||
|
|
@ -2280,8 +2352,8 @@ func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
|
|||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for error")
|
||||
}
|
||||
if se, ok := got.(transport.StreamError); !ok || se.Code != codes.Canceled {
|
||||
t.Errorf("error = %#v; want transport.StreamError with code Canceled", got)
|
||||
if grpc.Code(got) != codes.Canceled {
|
||||
t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
|
|||
if v := r.Header.Get("grpc-timeout"); v != "" {
|
||||
to, err := decodeTimeout(v)
|
||||
if err != nil {
|
||||
return nil, StreamErrorf(codes.Internal, "malformed time-out: %v", err)
|
||||
return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
|
||||
}
|
||||
st.timeoutSet = true
|
||||
st.timeout = to
|
||||
|
|
@ -393,5 +393,5 @@ func mapRecvMsgError(err error) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
return ConnectionErrorf(true, err, err.Error())
|
||||
return connectionErrorf(true, err, err.Error())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -149,7 +149,7 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
|
|||
scheme := "http"
|
||||
conn, err := dial(opts.Dialer, ctx, addr)
|
||||
if err != nil {
|
||||
return nil, ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
// Any further errors will close the underlying connection
|
||||
defer func(conn net.Conn) {
|
||||
|
|
@ -165,7 +165,7 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
|
|||
// Credentials handshake errors are typically considered permanent
|
||||
// to avoid retrying on e.g. bad certificates.
|
||||
temp := isTemporary(err)
|
||||
return nil, ConnectionErrorf(temp, err, "transport: %v", err)
|
||||
return nil, connectionErrorf(temp, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
ua := primaryUA
|
||||
|
|
@ -205,11 +205,11 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
|
|||
n, err := t.conn.Write(clientPreface)
|
||||
if err != nil {
|
||||
t.Close()
|
||||
return nil, ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
if n != len(clientPreface) {
|
||||
t.Close()
|
||||
return nil, ConnectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
|
||||
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
|
||||
}
|
||||
if initialWindowSize != defaultWindowSize {
|
||||
err = t.framer.writeSettings(true, http2.Setting{
|
||||
|
|
@ -221,13 +221,13 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
|
|||
}
|
||||
if err != nil {
|
||||
t.Close()
|
||||
return nil, ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
// Adjust the connection flow control window if needed.
|
||||
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
|
||||
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
|
||||
t.Close()
|
||||
return nil, ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
go t.controller()
|
||||
|
|
@ -295,12 +295,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||
}
|
||||
pos := strings.LastIndex(callHdr.Method, "/")
|
||||
if pos == -1 {
|
||||
return nil, StreamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
|
||||
return nil, streamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method)
|
||||
}
|
||||
audience := "https://" + callHdr.Host + port + callHdr.Method[:pos]
|
||||
data, err := c.GetRequestMetadata(ctx, audience)
|
||||
if err != nil {
|
||||
return nil, StreamErrorf(codes.InvalidArgument, "transport: %v", err)
|
||||
return nil, streamErrorf(codes.InvalidArgument, "transport: %v", err)
|
||||
}
|
||||
for k, v := range data {
|
||||
authData[k] = v
|
||||
|
|
@ -437,7 +437,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||
}
|
||||
if err != nil {
|
||||
t.notifyError(err)
|
||||
return nil, ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
t.writableChan <- 0
|
||||
|
|
@ -483,7 +483,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
|||
}
|
||||
s.state = streamDone
|
||||
s.mu.Unlock()
|
||||
if _, ok := err.(StreamError); ok {
|
||||
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
|
||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
|
||||
}
|
||||
}
|
||||
|
|
@ -651,7 +651,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
|||
// invoked.
|
||||
if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
|
||||
t.notifyError(err)
|
||||
return ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
if t.framer.adjustNumWriters(-1) == 0 {
|
||||
t.framer.flushWrite()
|
||||
|
|
@ -699,7 +699,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
|||
func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
size := len(f.Data())
|
||||
if err := t.fc.onData(uint32(size)); err != nil {
|
||||
t.notifyError(ConnectionErrorf(true, err, "%v", err))
|
||||
t.notifyError(connectionErrorf(true, err, "%v", err))
|
||||
return
|
||||
}
|
||||
// Select the right stream to dispatch.
|
||||
|
|
@ -805,7 +805,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
|||
if t.state == reachable || t.state == draining {
|
||||
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
|
||||
t.mu.Unlock()
|
||||
t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
|
||||
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
|
||||
return
|
||||
}
|
||||
select {
|
||||
|
|
@ -814,7 +814,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
|||
// t.goAway has been closed (i.e.,multiple GoAways).
|
||||
if id < f.LastStreamID {
|
||||
t.mu.Unlock()
|
||||
t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
|
||||
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
|
||||
return
|
||||
}
|
||||
t.prevGoAwayID = id
|
||||
|
|
@ -929,7 +929,7 @@ func (t *http2Client) reader() {
|
|||
t.mu.Unlock()
|
||||
if s != nil {
|
||||
// use error detail to provide better err message
|
||||
handleMalformedHTTP2(s, StreamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
|
||||
handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -111,12 +111,12 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
|
|||
Val: uint32(initialWindowSize)})
|
||||
}
|
||||
if err := framer.writeSettings(true, settings...); err != nil {
|
||||
return nil, ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
// Adjust the connection flow control window if needed.
|
||||
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
|
||||
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
|
||||
return nil, ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
|
|
@ -448,7 +448,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
|
|||
}
|
||||
if err != nil {
|
||||
t.Close()
|
||||
return ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
@ -544,7 +544,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||
s.mu.Lock()
|
||||
if s.state == streamDone {
|
||||
s.mu.Unlock()
|
||||
return StreamErrorf(codes.Unknown, "the stream has been done")
|
||||
return streamErrorf(codes.Unknown, "the stream has been done")
|
||||
}
|
||||
if !s.headerOk {
|
||||
writeHeaderFrame = true
|
||||
|
|
@ -568,7 +568,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||
}
|
||||
if err := t.framer.writeHeaders(false, p); err != nil {
|
||||
t.Close()
|
||||
return ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
t.writableChan <- 0
|
||||
}
|
||||
|
|
@ -642,7 +642,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
|||
}
|
||||
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
|
||||
t.Close()
|
||||
return ConnectionErrorf(true, err, "transport: %v", err)
|
||||
return connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
if t.framer.adjustNumWriters(-1) == 0 {
|
||||
t.framer.flushWrite()
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
|
|||
switch f.Name {
|
||||
case "content-type":
|
||||
if !validContentType(f.Value) {
|
||||
d.setErr(StreamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
|
||||
d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
|
||||
return
|
||||
}
|
||||
case "grpc-encoding":
|
||||
|
|
@ -170,7 +170,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
|
|||
case "grpc-status":
|
||||
code, err := strconv.Atoi(f.Value)
|
||||
if err != nil {
|
||||
d.setErr(StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
|
||||
d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
|
||||
return
|
||||
}
|
||||
d.statusCode = codes.Code(code)
|
||||
|
|
@ -181,7 +181,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
|
|||
var err error
|
||||
d.timeout, err = decodeTimeout(f.Value)
|
||||
if err != nil {
|
||||
d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
|
||||
d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
|
||||
return
|
||||
}
|
||||
case ":path":
|
||||
|
|
|
|||
|
|
@ -476,16 +476,16 @@ type ServerTransport interface {
|
|||
Drain()
|
||||
}
|
||||
|
||||
// StreamErrorf creates an StreamError with the specified error code and description.
|
||||
func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
|
||||
// streamErrorf creates an StreamError with the specified error code and description.
|
||||
func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
|
||||
return StreamError{
|
||||
Code: c,
|
||||
Desc: fmt.Sprintf(format, a...),
|
||||
}
|
||||
}
|
||||
|
||||
// ConnectionErrorf creates an ConnectionError with the specified error description.
|
||||
func ConnectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
|
||||
// connectionErrorf creates an ConnectionError with the specified error description.
|
||||
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
|
||||
return ConnectionError{
|
||||
Desc: fmt.Sprintf(format, a...),
|
||||
temp: temp,
|
||||
|
|
@ -522,10 +522,10 @@ func (e ConnectionError) Origin() error {
|
|||
|
||||
var (
|
||||
// ErrConnClosing indicates that the transport is closing.
|
||||
ErrConnClosing = ConnectionErrorf(true, nil, "transport is closing")
|
||||
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
|
||||
// ErrStreamDrain indicates that the stream is rejected by the server because
|
||||
// the server stops accepting new RPCs.
|
||||
ErrStreamDrain = StreamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
|
||||
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
|
||||
)
|
||||
|
||||
// StreamError is an error that only affects one stream within a connection.
|
||||
|
|
@ -542,9 +542,9 @@ func (e StreamError) Error() string {
|
|||
func ContextErr(err error) StreamError {
|
||||
switch err {
|
||||
case context.DeadlineExceeded:
|
||||
return StreamErrorf(codes.DeadlineExceeded, "%v", err)
|
||||
return streamErrorf(codes.DeadlineExceeded, "%v", err)
|
||||
case context.Canceled:
|
||||
return StreamErrorf(codes.Canceled, "%v", err)
|
||||
return streamErrorf(codes.Canceled, "%v", err)
|
||||
}
|
||||
panic(fmt.Sprintf("Unexpected error from context packet: %v", err))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -414,7 +414,7 @@ func TestLargeMessageSuspension(t *testing.T) {
|
|||
}
|
||||
// Write should not be done successfully due to flow control.
|
||||
err = ct.Write(s, expectedRequestLarge, &Options{Last: true, Delay: false})
|
||||
expectedErr := StreamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
|
||||
expectedErr := streamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
|
||||
if err != expectedErr {
|
||||
t.Fatalf("Write got %v, want %v", err, expectedErr)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue